Passed
Push — master ( 7dcdac...1c2ac6 )
by Mihai
28:59 queued 19:04
created

ConsumerTest::prepareAMQPConnection()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 5
rs 10
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Tests\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
9
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
10
use PhpAmqpLib\Channel\AMQPChannel;
11
use PhpAmqpLib\Connection\AMQPStreamConnection;
12
use PhpAmqpLib\Exception\AMQPTimeoutException;
13
use PhpAmqpLib\Message\AMQPMessage;
14
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
15
use PHPUnit\Framework\Assert;
16
use PHPUnit\Framework\TestCase;
17
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
18
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface as ContractsEventDispatcherInterface;
19
20
class ConsumerTest extends TestCase
21
{
22
    protected function getConsumer($amqpConnection, $amqpChannel)
23
    {
24
        return new Consumer($amqpConnection, $amqpChannel);
25
    }
26
27
    protected function prepareAMQPConnection()
28
    {
29
        return $this->getMockBuilder(AMQPStreamConnection::class)
30
            ->disableOriginalConstructor()
31
            ->getMock();
32
    }
33
34
    protected function prepareAMQPChannel()
35
    {
36
        return $this->getMockBuilder(AMQPChannel::class)
37
            ->disableOriginalConstructor()
38
            ->getMock();
39
    }
40
41
    /**
42
     * Check if the message is requeued or not correctly.
43
     *
44
     * @dataProvider processMessageProvider
45
     */
46
    public function testProcessMessage($processFlag, $expectedMethod = null, $expectedRequeue = null)
47
    {
48
        $amqpConnection = $this->prepareAMQPConnection();
49
        $amqpChannel = $this->prepareAMQPChannel();
50
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
51
52
        $callbackFunction = function () use ($processFlag) {
53
            return $processFlag;
54
        }; // Create a callback function with a return value set by the data provider.
55
        $consumer->setCallback($callbackFunction);
56
57
        // Create a default message
58
        $amqpMessage = new AMQPMessage('foo body');
59
        $amqpMessage->setChannel($amqpChannel);
60
        $amqpMessage->setDeliveryTag(0);
61
62
        if ($expectedMethod) {
63
            $amqpChannel->expects($this->any())
64
                ->method('basic_reject')
65
                ->will($this->returnCallback(function ($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue) {
66
                    Assert::assertSame($expectedMethod, 'basic_reject'); // Check if this function should be called.
67
                    Assert::assertSame($requeue, $expectedRequeue); // Check if the message should be requeued.
68
                }));
69
70
            $amqpChannel->expects($this->any())
71
                ->method('basic_ack')
72
                ->will($this->returnCallback(function ($delivery_tag) use ($expectedMethod) {
0 ignored issues
show
Unused Code introduced by
The parameter $delivery_tag is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

72
                ->will($this->returnCallback(function (/** @scrutinizer ignore-unused */ $delivery_tag) use ($expectedMethod) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
73
                    Assert::assertSame($expectedMethod, 'basic_ack'); // Check if this function should be called.
74
                }));
75
        } else {
76
            $amqpChannel->expects($this->never())->method('basic_reject');
77
            $amqpChannel->expects($this->never())->method('basic_ack');
78
            $amqpChannel->expects($this->never())->method('basic_nack');
79
        }
80
81
        $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)
82
            ->disableOriginalConstructor()
83
            ->getMock();
84
85
        $consumer->setEventDispatcher($eventDispatcher);
86
87
        $eventDispatcher->expects($this->atLeastOnce())
0 ignored issues
show
Deprecated Code introduced by
The function PHPUnit\Framework\MockOb...cker::withConsecutive() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

87
        /** @scrutinizer ignore-deprecated */ $eventDispatcher->expects($this->atLeastOnce())
Loading history...
88
            ->method('dispatch')
89
            ->withConsecutive(
90
                [new BeforeProcessingMessageEvent($consumer, $amqpMessage), BeforeProcessingMessageEvent::NAME],
91
                [new AfterProcessingMessageEvent($consumer, $amqpMessage), AfterProcessingMessageEvent::NAME]
92
            )
93
            ->willReturnOnConsecutiveCalls(
94
                new BeforeProcessingMessageEvent($consumer, $amqpMessage),
95
                new AfterProcessingMessageEvent($consumer, $amqpMessage)
96
            );
97
98
        $consumer->processMessage($amqpMessage);
99
    }
100
101
    public function processMessageProvider()
102
    {
103
        return [
104
            [null, 'basic_ack'], // Remove message from queue only if callback return not false
105
            [true, 'basic_ack'], // Remove message from queue only if callback return not false
106
            [false, 'basic_reject', true], // Reject and requeue message to RabbitMQ
107
            [ConsumerInterface::MSG_ACK, 'basic_ack'], // Remove message from queue only if callback return not false
108
            [ConsumerInterface::MSG_REJECT_REQUEUE, 'basic_reject', true], // Reject and requeue message to RabbitMQ
109
            [ConsumerInterface::MSG_REJECT, 'basic_reject', false], // Reject and drop
110
            [ConsumerInterface::MSG_ACK_SENT], // ack not sent by the consumer but should be sent by the implementer of ConsumerInterface
111
        ];
112
    }
113
114
    /**
115
     * @return array
116
     */
117
    public function consumeProvider()
118
    {
119
        $testCases["All ok 4 callbacks"] = [
0 ignored issues
show
Comprehensibility Best Practice introduced by
$testCases was never initialized. Although not strictly required by PHP, it is generally a good practice to add $testCases = array(); before regardless.
Loading history...
120
            [
121
                "messages" => [
122
                    "msgCallback1",
123
                    "msgCallback2",
124
                    "msgCallback3",
125
                    "msgCallback4",
126
                ],
127
            ],
128
        ];
129
130
        $testCases["No callbacks"] = [
131
            [
132
                "messages" => [],
133
            ],
134
        ];
135
136
        return $testCases;
137
    }
138
139
    /**
140
     * @dataProvider consumeProvider
141
     *
142
     * @param array $data
143
     */
144
    public function testConsume(array $data)
145
    {
146
        $messageCount = count($data['messages']);
147
148
        // set up amqp connection
149
        $amqpConnection = $this->prepareAMQPConnection();
150
        // set up amqp channel
151
        $amqpChannel = $this->prepareAMQPChannel();
152
        $amqpChannel->expects($this->atLeastOnce())
153
            ->method('getChannelId')
154
            ->with()
155
            ->willReturn(true);
156
        $amqpChannel
157
            ->expects($this->once())
158
            ->method('basic_consume')
159
            ->withAnyParameters()
160
            ->willReturn(true);
161
        $amqpChannel
162
            ->expects(self::exactly(2))
163
            ->method('is_consuming')
164
            ->willReturnOnConsecutiveCalls(array_merge(array_fill(0, $messageCount, true), [false]));
165
166
        // set up consumer
167
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
168
        // disable autosetup fabric so we do not mock more objects
169
        $consumer->disableAutoSetupFabric();
170
        $consumer->setChannel($amqpChannel);
171
172
        /**
173
         * Mock wait method and use a callback to remove one element each time from callbacks
174
         * This will simulate a basic consumer consume with provided messages count
175
         */
176
        $amqpChannel
177
            ->expects(self::exactly(1))
178
            ->method('wait')
179
            ->with(null, false, $consumer->getIdleTimeout())
180
            ->willReturn(true);
181
182
        $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)
183
            ->disableOriginalConstructor()
184
            ->getMock();
185
186
        $eventDispatcher
187
            ->expects(self::exactly(1))
188
            ->method('dispatch')
189
            ->with($this->isInstanceOf(OnConsumeEvent::class), OnConsumeEvent::NAME)
190
            ->willReturn($this->isInstanceOf(OnConsumeEvent::class));
191
192
        $consumer->setEventDispatcher($eventDispatcher);
193
        $consumer->consume(1);
194
    }
195
196
    public function testIdleTimeoutExitCode()
197
    {
198
        // set up amqp connection
199
        $amqpConnection = $this->prepareAMQPConnection();
200
        // set up amqp channel
201
        $amqpChannel = $this->prepareAMQPChannel();
202
        $amqpChannel->expects($this->atLeastOnce())
203
            ->method('getChannelId')
204
            ->with()
205
            ->willReturn(true);
206
        $amqpChannel->expects($this->once())
207
            ->method('basic_consume')
208
            ->withAnyParameters()
209
            ->willReturn(true);
210
        $amqpChannel
211
            ->expects($this->any())
212
            ->method('is_consuming')
213
            ->willReturn(true);
214
215
        // set up consumer
216
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
217
        // disable autosetup fabric so we do not mock more objects
218
        $consumer->disableAutoSetupFabric();
219
        $consumer->setChannel($amqpChannel);
220
        $consumer->setIdleTimeout(60);
221
        $consumer->setIdleTimeoutExitCode(2);
222
223
        $amqpChannel->expects($this->exactly(1))
224
            ->method('wait')
225
            ->with(null, false, $consumer->getIdleTimeout())
226
            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
227
                // simulate time passing by moving the last activity date time
228
                $consumer->setLastActivityDateTime(new \DateTime("-$waitTimeout seconds"));
229
                throw new AMQPTimeoutException();
230
            });
231
232
        $this->assertTrue(2 == $consumer->consume(1));
233
    }
234
235
    public function testShouldAllowContinueConsumptionAfterIdleTimeout()
236
    {
237
        // set up amqp connection
238
        $amqpConnection = $this->prepareAMQPConnection();
239
        // set up amqp channel
240
        $amqpChannel = $this->prepareAMQPChannel();
241
        $amqpChannel->expects($this->atLeastOnce())
242
            ->method('getChannelId')
243
            ->with()
244
            ->willReturn(true);
245
        $amqpChannel->expects($this->once())
246
            ->method('basic_consume')
247
            ->withAnyParameters()
248
            ->willReturn(true);
249
        $amqpChannel
250
            ->expects($this->any())
251
            ->method('is_consuming')
252
            ->willReturn(true);
253
254
        // set up consumer
255
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
256
        // disable autosetup fabric so we do not mock more objects
257
        $consumer->disableAutoSetupFabric();
258
        $consumer->setChannel($amqpChannel);
259
        $consumer->setIdleTimeout(2);
260
261
        $amqpChannel->expects($this->exactly(2))
262
            ->method('wait')
263
            ->with(null, false, $consumer->getIdleTimeout())
264
            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
265
                // simulate time passing by moving the last activity date time
266
                $consumer->setLastActivityDateTime(new \DateTime("-$waitTimeout seconds"));
267
                throw new AMQPTimeoutException();
268
            });
269
270
        $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)
271
            ->disableOriginalConstructor()
272
            ->getMock();
273
274
        $eventDispatcher->expects($this->exactly(4))
0 ignored issues
show
Deprecated Code introduced by
The function PHPUnit\Framework\MockOb...cker::withConsecutive() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

274
        /** @scrutinizer ignore-deprecated */ $eventDispatcher->expects($this->exactly(4))
Loading history...
275
            ->method('dispatch')
276
            ->withConsecutive(
277
                [
278
                    $this->isInstanceOf(OnConsumeEvent::class),
279
                    OnConsumeEvent::NAME,
280
                ],
281
                [
282
                    $this->callback(function (OnIdleEvent $event) {
283
                        $event->setForceStop(false);
284
285
                        return true;
286
                    }),
287
                    OnIdleEvent::NAME,
288
                ],
289
                [
290
                    $this->isInstanceOf(OnConsumeEvent::class),
291
                    OnConsumeEvent::NAME,
292
                ],
293
                [
294
                    $this->callback(function (OnIdleEvent $event) {
295
                        $event->setForceStop(true);
296
297
                        return true;
298
                    }),
299
                    OnIdleEvent::NAME,
300
                ]
301
            );
302
303
        $consumer->setEventDispatcher($eventDispatcher);
304
305
        $this->expectException(AMQPTimeoutException::class);
306
        $consumer->consume(10);
307
    }
308
    //TODO: try to understand the logic and fix the test
309
    //    public function testGracefulMaxExecutionTimeoutExitCode()
310
    //    {
311
    //        // set up amqp connection
312
    //        $amqpConnection = $this->prepareAMQPConnection();
313
    //        // set up amqp channel
314
    //        $amqpChannel = $this->prepareAMQPChannel();
315
    //        $amqpChannel->expects($this->atLeastOnce())
316
    //            ->method('getChannelId')
317
    //            ->with()
318
    //            ->willReturn(true);
319
    //        $amqpChannel->expects($this->once())
320
    //            ->method('basic_consume')
321
    //            ->withAnyParameters()
322
    //            ->willReturn(true);
323
    //        $amqpChannel
324
    //            ->expects($this->any())
325
    //            ->method('is_consuming')
326
    //            ->willReturn(true);
327
    //
328
    //        // set up consumer
329
    //        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
330
    //        // disable autosetup fabric so we do not mock more objects
331
    //        $consumer->disableAutoSetupFabric();
332
    //        $consumer->setChannel($amqpChannel);
333
    //        $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(60);
334
    //        $consumer->setGracefulMaxExecutionTimeoutExitCode(10);
335
    //
336
    //        $amqpChannel->expects($this->exactly(1))
337
    //            ->method('wait')
338
    //            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
339
    //                // simulate time passing by moving the max execution date time
340
    //                $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($waitTimeout * -1);
341
    //                throw new AMQPTimeoutException();
342
    //            });
343
    //
344
    //        $this->assertSame(10, $consumer->consume(1));
345
    //    }
346
347
    public function testGracefulMaxExecutionWontWaitIfPastTheTimeout()
348
    {
349
        // set up amqp connection
350
        $amqpConnection = $this->prepareAMQPConnection();
351
        // set up amqp channel
352
        $amqpChannel = $this->prepareAMQPChannel();
353
        $amqpChannel->expects($this->atLeastOnce())
354
            ->method('getChannelId')
355
            ->with()
356
            ->willReturn(true);
357
        $amqpChannel->expects($this->once())
358
            ->method('basic_consume')
359
            ->withAnyParameters()
360
            ->willReturn(true);
361
        $amqpChannel
362
            ->expects($this->any())
363
            ->method('is_consuming')
364
            ->willReturn(true);
365
366
        // set up consumer
367
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
368
        // disable autosetup fabric so we do not mock more objects
369
        $consumer->disableAutoSetupFabric();
370
        $consumer->setChannel($amqpChannel);
371
372
        $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(0);
373
374
        $amqpChannel
375
            ->expects($this->never())
376
            ->method('wait');
377
378
        $consumer->consume(1);
379
    }
380
381
    public function testTimeoutWait()
382
    {
383
        // set up amqp connection
384
        $amqpConnection = $this->prepareAMQPConnection();
385
        // set up amqp channel
386
        $amqpChannel = $this->prepareAMQPChannel();
387
        $amqpChannel->expects($this->atLeastOnce())
388
            ->method('getChannelId')
389
            ->with()
390
            ->willReturn(true);
391
        $amqpChannel->expects($this->once())
392
            ->method('basic_consume')
393
            ->withAnyParameters()
394
            ->willReturn(true);
395
        $amqpChannel
396
            ->expects($this->any())
397
            ->method('is_consuming')
398
            ->willReturn(true);
399
400
        // set up consumer
401
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
402
        // disable autosetup fabric so we do not mock more objects
403
        $consumer->disableAutoSetupFabric();
404
        $consumer->setChannel($amqpChannel);
405
        $consumer->setTimeoutWait(30);
406
        $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(60);
407
        $consumer->setIdleTimeout(50);
408
409
        $amqpChannel->expects($this->exactly(2))
410
            ->method('wait')
411
            ->with(null, false, $this->LessThanOrEqual($consumer->getTimeoutWait()))
412
            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
413
                // ensure max execution date time "counts down"
414
                $consumer->setGracefulMaxExecutionDateTime(
415
                    $consumer->getGracefulMaxExecutionDateTime()->modify("-$waitTimeout seconds")
416
                );
417
                // ensure last activity just occurred so idle timeout is not reached
418
                $consumer->setLastActivityDateTime(new \DateTime());
419
                throw new AMQPTimeoutException();
420
            });
421
422
        $consumer->consume(1);
423
    }
424
    //TODO: try to understand the logic and fix the test
425
    //    public function testTimeoutWaitWontWaitPastGracefulMaxExecutionTimeout()
426
    //    {
427
    //        // set up amqp connection
428
    //        $amqpConnection = $this->prepareAMQPConnection();
429
    //        // set up amqp channel
430
    //        $amqpChannel = $this->prepareAMQPChannel();
431
    //        $amqpChannel->expects($this->atLeastOnce())
432
    //            ->method('getChannelId')
433
    //            ->with()
434
    //            ->willReturn(true);
435
    //        $amqpChannel->expects($this->once())
436
    //            ->method('basic_consume')
437
    //            ->withAnyParameters()
438
    //            ->willReturn(true);
439
    //        $amqpChannel
440
    //            ->expects($this->any())
441
    //            ->method('is_consuming')
442
    //            ->willReturn(true);
443
    //
444
    //        // set up consumer
445
    //        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
446
    //        // disable autosetup fabric so we do not mock more objects
447
    //        $consumer->disableAutoSetupFabric();
448
    //        $consumer->setChannel($amqpChannel);
449
    //        $consumer->setTimeoutWait(20);
450
    //
451
    //        $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(10);
452
    //
453
    //        $amqpChannel->expects($this->once())
454
    //            ->method('wait')
455
    //            ->with(null, false, $consumer->getGracefulMaxExecutionDateTime()->diff(new \DateTime())->s)
456
    //            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
457
    //                // simulate time passing by moving the max execution date time
458
    //                $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($waitTimeout * -1);
459
    //                throw new AMQPTimeoutException();
460
    //            });
461
    //
462
    //        $consumer->consume(1);
463
    //    }
464
465
    public function testTimeoutWaitWontWaitPastIdleTimeout()
466
    {
467
        // set up amqp connection
468
        $amqpConnection = $this->prepareAMQPConnection();
469
        // set up amqp channel
470
        $amqpChannel = $this->prepareAMQPChannel();
471
        $amqpChannel->expects($this->atLeastOnce())
472
            ->method('getChannelId')
473
            ->with()
474
            ->willReturn(true);
475
        $amqpChannel->expects($this->once())
476
            ->method('basic_consume')
477
            ->withAnyParameters()
478
            ->willReturn(true);
479
        $amqpChannel
480
            ->expects($this->any())
481
            ->method('is_consuming')
482
            ->willReturn(true);
483
484
        // set up consumer
485
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
486
        // disable autosetup fabric so we do not mock more objects
487
        $consumer->disableAutoSetupFabric();
488
        $consumer->setChannel($amqpChannel);
489
        $consumer->setTimeoutWait(20);
490
        $consumer->setIdleTimeout(10);
491
        $consumer->setIdleTimeoutExitCode(2);
492
493
        $amqpChannel->expects($this->once())
494
            ->method('wait')
495
            ->with(null, false, 10)
496
            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
497
                // simulate time passing by moving the last activity date time
498
                $consumer->setLastActivityDateTime(new \DateTime("-$waitTimeout seconds"));
499
                throw new AMQPTimeoutException();
500
            });
501
502
        $this->assertEquals(2, $consumer->consume(1));
503
    }
504
}
505