ConsumerTest   A
last analyzed

Complexity

Total Complexity 15

Size/Duplication

Total Lines 453
Duplicated Lines 0 %

Importance

Changes 10
Bugs 2 Features 2
Metric Value
eloc 269
dl 0
loc 453
rs 10
c 10
b 2
f 2
wmc 15

14 Methods

Rating   Name   Duplication   Size   Complexity  
A prepareAMQPConnection() 0 5 1
A consumeProvider() 0 20 1
A getConsumer() 0 3 1
A processMessageProvider() 0 10 1
A testProcessMessage() 0 53 2
A prepareAMQPChannel() 0 5 1
A testConsume() 0 50 1
A testTimeoutWaitWontWaitPastIdleTimeout() 0 36 1
A testTimeoutWait() 0 40 1
A testIdleTimeoutExitCode() 0 34 1
A testGracefulMaxExecutionWontWaitIfPastTheTimeout() 0 28 1
A testTimeoutWaitWontWaitPastGracefulMaxExecutionTimeout() 0 35 1
A testGracefulMaxExecutionTimeoutExitCode() 0 34 1
A testShouldAllowContinueConsumptionAfterIdleTimeout() 0 58 1
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Tests\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
9
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
10
use PhpAmqpLib\Channel\AMQPChannel;
11
use PhpAmqpLib\Connection\AMQPStreamConnection;
12
use PhpAmqpLib\Exception\AMQPTimeoutException;
13
use PhpAmqpLib\Message\AMQPMessage;
14
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
15
use PHPUnit\Framework\Assert;
16
use PHPUnit\Framework\TestCase;
17
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
18
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface as ContractsEventDispatcherInterface;
19
20
class ConsumerTest extends TestCase
21
{
22
    protected function getConsumer($amqpConnection, $amqpChannel)
23
    {
24
        return new Consumer($amqpConnection, $amqpChannel);
25
    }
26
27
    protected function prepareAMQPConnection()
28
    {
29
        return $this->getMockBuilder(AMQPStreamConnection::class)
30
            ->disableOriginalConstructor()
31
            ->getMock();
32
    }
33
34
    protected function prepareAMQPChannel()
35
    {
36
        return $this->getMockBuilder(AMQPChannel::class)
37
            ->disableOriginalConstructor()
38
            ->getMock();
39
    }
40
41
    /**
42
     * Check if the message is requeued or not correctly.
43
     *
44
     * @dataProvider processMessageProvider
45
     */
46
    public function testProcessMessage($processFlag, $expectedMethod = null, $expectedRequeue = null)
47
    {
48
        $amqpConnection = $this->prepareAMQPConnection();
49
        $amqpChannel = $this->prepareAMQPChannel();
50
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
51
52
        $callbackFunction = function () use ($processFlag) {
53
            return $processFlag;
54
        }; // Create a callback function with a return value set by the data provider.
55
        $consumer->setCallback($callbackFunction);
56
57
        // Create a default message
58
        $amqpMessage = new AMQPMessage('foo body');
59
        $amqpMessage->delivery_info['channel'] = $amqpChannel;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

59
        /** @scrutinizer ignore-deprecated */ $amqpMessage->delivery_info['channel'] = $amqpChannel;
Loading history...
60
        $amqpMessage->delivery_info['delivery_tag'] = 0;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

60
        /** @scrutinizer ignore-deprecated */ $amqpMessage->delivery_info['delivery_tag'] = 0;
Loading history...
61
62
        if ($expectedMethod) {
63
            $amqpChannel->expects($this->any())
64
                ->method('basic_reject')
65
                ->will($this->returnCallback(function ($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue) {
66
                    Assert::assertSame($expectedMethod, 'basic_reject'); // Check if this function should be called.
67
                    Assert::assertSame($requeue, $expectedRequeue); // Check if the message should be requeued.
68
                }));
69
70
            $amqpChannel->expects($this->any())
71
                ->method('basic_ack')
72
                ->will($this->returnCallback(function ($delivery_tag) use ($expectedMethod) {
0 ignored issues
show
Unused Code introduced by
The parameter $delivery_tag is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

72
                ->will($this->returnCallback(function (/** @scrutinizer ignore-unused */ $delivery_tag) use ($expectedMethod) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
73
                    Assert::assertSame($expectedMethod, 'basic_ack'); // Check if this function should be called.
74
                }));
75
        } else {
76
            $amqpChannel->expects($this->never())->method('basic_reject');
77
            $amqpChannel->expects($this->never())->method('basic_ack');
78
            $amqpChannel->expects($this->never())->method('basic_nack');
79
        }
80
81
        $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)
82
            ->disableOriginalConstructor()
83
            ->getMock();
84
85
        $consumer->setEventDispatcher($eventDispatcher);
86
87
        $eventDispatcher->expects($this->atLeastOnce())
88
            ->method('dispatch')
89
            ->withConsecutive(
90
                array(new BeforeProcessingMessageEvent($consumer, $amqpMessage), BeforeProcessingMessageEvent::NAME),
91
                array(new AfterProcessingMessageEvent($consumer, $amqpMessage), AfterProcessingMessageEvent::NAME)
92
            )
93
            ->willReturnOnConsecutiveCalls(
94
                new BeforeProcessingMessageEvent($consumer, $amqpMessage),
95
                new AfterProcessingMessageEvent($consumer, $amqpMessage)
96
            );
97
98
        $consumer->processMessage($amqpMessage);
99
    }
100
101
    public function processMessageProvider()
102
    {
103
        return array(
104
            array(null, 'basic_ack'), // Remove message from queue only if callback return not false
105
            array(true, 'basic_ack'), // Remove message from queue only if callback return not false
106
            array(false, 'basic_reject', true), // Reject and requeue message to RabbitMQ
107
            array(ConsumerInterface::MSG_ACK, 'basic_ack'), // Remove message from queue only if callback return not false
108
            array(ConsumerInterface::MSG_REJECT_REQUEUE, 'basic_reject', true), // Reject and requeue message to RabbitMQ
109
            array(ConsumerInterface::MSG_REJECT, 'basic_reject', false), // Reject and drop
110
            array(ConsumerInterface::MSG_ACK_SENT), // ack not sent by the consumer but should be sent by the implementer of ConsumerInterface
111
        );
112
    }
113
114
    /**
115
     * @return array
116
     */
117
    public function consumeProvider()
118
    {
119
        $testCases["All ok 4 callbacks"] = array(
0 ignored issues
show
Comprehensibility Best Practice introduced by
$testCases was never initialized. Although not strictly required by PHP, it is generally a good practice to add $testCases = array(); before regardless.
Loading history...
120
            array(
121
                "messages" => array(
122
                    "msgCallback1",
123
                    "msgCallback2",
124
                    "msgCallback3",
125
                    "msgCallback4",
126
                )
127
            )
128
        );
129
130
        $testCases["No callbacks"] = array(
131
            array(
132
                "messages" => array()
133
            )
134
        );
135
136
        return $testCases;
137
    }
138
139
    /**
140
     * @dataProvider consumeProvider
141
     *
142
     * @param array $data
143
     */
144
    public function testConsume(array $data)
145
    {
146
        $consumerCallBacks = $data['messages'];
147
148
        // set up amqp connection
149
        $amqpConnection = $this->prepareAMQPConnection();
150
        // set up amqp channel
151
        $amqpChannel = $this->prepareAMQPChannel();
152
        $amqpChannel->expects($this->atLeastOnce())
153
            ->method('getChannelId')
154
            ->with()
155
            ->willReturn(true);
156
        $amqpChannel->expects($this->once())
157
            ->method('basic_consume')
158
            ->withAnyParameters()
159
            ->willReturn(true);
160
161
        // set up consumer
162
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
163
        // disable autosetup fabric so we do not mock more objects
164
        $consumer->disableAutoSetupFabric();
165
        $consumer->setChannel($amqpChannel);
166
        $amqpChannel->callbacks = $consumerCallBacks;
0 ignored issues
show
Bug introduced by
Accessing callbacks on the interface PHPUnit\Framework\MockObject\MockObject suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
167
168
        /**
169
         * Mock wait method and use a callback to remove one element each time from callbacks
170
         * This will simulate a basic consumer consume with provided messages count
171
         */
172
        $amqpChannel->expects($this->exactly(count($consumerCallBacks)))
173
            ->method('wait')
174
            ->with(null, false, $consumer->getIdleTimeout())
175
            ->will(
176
                $this->returnCallback(
177
                    function () use ($amqpChannel) {
178
                        /** remove an element on each loop like ... simulate an ACK */
179
                        array_splice($amqpChannel->callbacks, 0, 1);
0 ignored issues
show
Bug introduced by
Accessing callbacks on the interface PHPUnit\Framework\MockObject\MockObject suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
180
                    })
181
            );
182
183
        $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)
184
            ->disableOriginalConstructor()
185
            ->getMock();
186
187
        $eventDispatcher->expects($this->exactly(count($consumerCallBacks)))
188
            ->method('dispatch')
189
            ->with($this->isInstanceOf(OnConsumeEvent::class), OnConsumeEvent::NAME)
190
            ->willReturn($this->isInstanceOf(OnConsumeEvent::class));
191
192
        $consumer->setEventDispatcher($eventDispatcher);
193
        $consumer->consume(1);
194
    }
195
196
    public function testIdleTimeoutExitCode()
197
    {
198
        // set up amqp connection
199
        $amqpConnection = $this->prepareAMQPConnection();
200
        // set up amqp channel
201
        $amqpChannel = $this->prepareAMQPChannel();
202
        $amqpChannel->expects($this->atLeastOnce())
203
            ->method('getChannelId')
204
            ->with()
205
            ->willReturn(true);
206
        $amqpChannel->expects($this->once())
207
            ->method('basic_consume')
208
            ->withAnyParameters()
209
            ->willReturn(true);
210
211
        // set up consumer
212
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
213
        // disable autosetup fabric so we do not mock more objects
214
        $consumer->disableAutoSetupFabric();
215
        $consumer->setChannel($amqpChannel);
216
        $consumer->setIdleTimeout(60);
217
        $consumer->setIdleTimeoutExitCode(2);
218
        $amqpChannel->callbacks = array('idle_timeout_exit_code');
0 ignored issues
show
Bug introduced by
Accessing callbacks on the interface PHPUnit\Framework\MockObject\MockObject suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
219
220
        $amqpChannel->expects($this->exactly(1))
221
            ->method('wait')
222
            ->with(null, false, $consumer->getIdleTimeout())
223
            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
224
                // simulate time passing by moving the last activity date time
225
                $consumer->setLastActivityDateTime(new \DateTime("-$waitTimeout seconds"));
226
                throw new AMQPTimeoutException();
227
            });
228
229
        $this->assertTrue(2 == $consumer->consume(1));
230
    }
231
232
    public function testShouldAllowContinueConsumptionAfterIdleTimeout()
233
    {
234
        // set up amqp connection
235
        $amqpConnection = $this->prepareAMQPConnection();
236
        // set up amqp channel
237
        $amqpChannel = $this->prepareAMQPChannel();
238
        $amqpChannel->expects($this->atLeastOnce())
239
            ->method('getChannelId')
240
            ->with()
241
            ->willReturn(true);
242
        $amqpChannel->expects($this->once())
243
            ->method('basic_consume')
244
            ->withAnyParameters()
245
            ->willReturn(true);
246
247
        // set up consumer
248
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
249
        // disable autosetup fabric so we do not mock more objects
250
        $consumer->disableAutoSetupFabric();
251
        $consumer->setChannel($amqpChannel);
252
        $consumer->setIdleTimeout(2);
253
        $amqpChannel->callbacks = array('idle_timeout_exit_code');
0 ignored issues
show
Bug introduced by
Accessing callbacks on the interface PHPUnit\Framework\MockObject\MockObject suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
254
255
        $amqpChannel->expects($this->exactly(2))
256
            ->method('wait')
257
            ->with(null, false, $consumer->getIdleTimeout())
258
            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
259
                // simulate time passing by moving the last activity date time
260
                $consumer->setLastActivityDateTime(new \DateTime("-$waitTimeout seconds"));
261
                throw new AMQPTimeoutException();
262
            });
263
264
        $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)
265
            ->disableOriginalConstructor()
266
            ->getMock();
267
268
        $eventDispatcher->expects($this->at(1))
269
            ->method('dispatch')
270
            ->with($this->isInstanceOf(OnIdleEvent::class), OnIdleEvent::NAME)
271
            ->willReturnCallback(function (OnIdleEvent $event, $eventName) {
0 ignored issues
show
Unused Code introduced by
The parameter $eventName is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

271
            ->willReturnCallback(function (OnIdleEvent $event, /** @scrutinizer ignore-unused */ $eventName) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
272
                $event->setForceStop(false);
273
274
                return $event;
275
            });
276
277
        $eventDispatcher->expects($this->at(3))
278
            ->method('dispatch')
279
            ->with($this->isInstanceOf(OnIdleEvent::class), OnIdleEvent::NAME)
280
            ->willReturnCallback(function (OnIdleEvent $event, $eventName) {
0 ignored issues
show
Unused Code introduced by
The parameter $eventName is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

280
            ->willReturnCallback(function (OnIdleEvent $event, /** @scrutinizer ignore-unused */ $eventName) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
281
                $event->setForceStop(true);
282
283
                return $event;
284
            });
285
286
        $consumer->setEventDispatcher($eventDispatcher);
287
288
        $this->expectException(AMQPTimeoutException::class);
289
        $consumer->consume(10);
290
    }
291
292
    public function testGracefulMaxExecutionTimeoutExitCode()
293
    {
294
        // set up amqp connection
295
        $amqpConnection = $this->prepareAMQPConnection();
296
        // set up amqp channel
297
        $amqpChannel = $this->prepareAMQPChannel();
298
        $amqpChannel->expects($this->atLeastOnce())
299
            ->method('getChannelId')
300
            ->with()
301
            ->willReturn(true);
302
        $amqpChannel->expects($this->once())
303
            ->method('basic_consume')
304
            ->withAnyParameters()
305
            ->willReturn(true);
306
307
        // set up consumer
308
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
309
        // disable autosetup fabric so we do not mock more objects
310
        $consumer->disableAutoSetupFabric();
311
        $consumer->setChannel($amqpChannel);
312
313
        $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(60);
314
        $consumer->setGracefulMaxExecutionTimeoutExitCode(10);
315
        $amqpChannel->callbacks = array('graceful_max_execution_timeout_test');
0 ignored issues
show
Bug introduced by
Accessing callbacks on the interface PHPUnit\Framework\MockObject\MockObject suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
316
317
        $amqpChannel->expects($this->exactly(1))
318
            ->method('wait')
319
            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
320
                // simulate time passing by moving the max execution date time
321
                $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($waitTimeout * -1);
322
                throw new AMQPTimeoutException();
323
            });
324
325
        $this->assertSame(10, $consumer->consume(1));
326
    }
327
328
    public function testGracefulMaxExecutionWontWaitIfPastTheTimeout()
329
    {
330
        // set up amqp connection
331
        $amqpConnection = $this->prepareAMQPConnection();
332
        // set up amqp channel
333
        $amqpChannel = $this->prepareAMQPChannel();
334
        $amqpChannel->expects($this->atLeastOnce())
335
            ->method('getChannelId')
336
            ->with()
337
            ->willReturn(true);
338
        $amqpChannel->expects($this->once())
339
            ->method('basic_consume')
340
            ->withAnyParameters()
341
            ->willReturn(true);
342
343
        // set up consumer
344
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
345
        // disable autosetup fabric so we do not mock more objects
346
        $consumer->disableAutoSetupFabric();
347
        $consumer->setChannel($amqpChannel);
348
349
        $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(0);
350
        $amqpChannel->callbacks = array('graceful_max_execution_timeout_test');
0 ignored issues
show
Bug introduced by
Accessing callbacks on the interface PHPUnit\Framework\MockObject\MockObject suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
351
352
        $amqpChannel->expects($this->never())
353
            ->method('wait');
354
355
        $consumer->consume(1);
356
    }
357
358
    public function testTimeoutWait()
359
    {
360
        // set up amqp connection
361
        $amqpConnection = $this->prepareAMQPConnection();
362
        // set up amqp channel
363
        $amqpChannel = $this->prepareAMQPChannel();
364
        $amqpChannel->expects($this->atLeastOnce())
365
            ->method('getChannelId')
366
            ->with()
367
            ->willReturn(true);
368
        $amqpChannel->expects($this->once())
369
            ->method('basic_consume')
370
            ->withAnyParameters()
371
            ->willReturn(true);
372
373
        // set up consumer
374
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
375
        // disable autosetup fabric so we do not mock more objects
376
        $consumer->disableAutoSetupFabric();
377
        $consumer->setChannel($amqpChannel);
378
        $consumer->setTimeoutWait(30);
379
        $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(60);
380
        $consumer->setIdleTimeout(50);
381
382
        $amqpChannel->callbacks = array('timeout_wait_test');
0 ignored issues
show
Bug introduced by
Accessing callbacks on the interface PHPUnit\Framework\MockObject\MockObject suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
383
384
        $amqpChannel->expects($this->exactly(2))
385
            ->method('wait')
386
            ->with(null, false, $this->LessThanOrEqual($consumer->getTimeoutWait()) )
387
            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
388
                // ensure max execution date time "counts down"
389
                $consumer->setGracefulMaxExecutionDateTime(
390
                    $consumer->getGracefulMaxExecutionDateTime()->modify("-$waitTimeout seconds")
391
                );
392
                // ensure last activity just occurred so idle timeout is not reached
393
                $consumer->setLastActivityDateTime(new \DateTime());
394
                throw new AMQPTimeoutException();
395
            });
396
397
        $consumer->consume(1);
398
    }
399
400
    public function testTimeoutWaitWontWaitPastGracefulMaxExecutionTimeout()
401
    {
402
        // set up amqp connection
403
        $amqpConnection = $this->prepareAMQPConnection();
404
        // set up amqp channel
405
        $amqpChannel = $this->prepareAMQPChannel();
406
        $amqpChannel->expects($this->atLeastOnce())
407
            ->method('getChannelId')
408
            ->with()
409
            ->willReturn(true);
410
        $amqpChannel->expects($this->once())
411
            ->method('basic_consume')
412
            ->withAnyParameters()
413
            ->willReturn(true);
414
415
        // set up consumer
416
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
417
        // disable autosetup fabric so we do not mock more objects
418
        $consumer->disableAutoSetupFabric();
419
        $consumer->setChannel($amqpChannel);
420
        $consumer->setTimeoutWait(20);
421
422
        $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(10);
423
        $amqpChannel->callbacks = array('graceful_max_execution_timeout_test');
0 ignored issues
show
Bug introduced by
Accessing callbacks on the interface PHPUnit\Framework\MockObject\MockObject suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
424
425
        $amqpChannel->expects($this->once())
426
            ->method('wait')
427
            ->with(null, false, $consumer->getGracefulMaxExecutionDateTime()->diff(new \DateTime())->s)
428
            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
429
                // simulate time passing by moving the max execution date time
430
                $consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($waitTimeout * -1);
431
                throw new AMQPTimeoutException();
432
            });
433
434
        $consumer->consume(1);
435
    }
436
437
    public function testTimeoutWaitWontWaitPastIdleTimeout()
438
    {
439
        // set up amqp connection
440
        $amqpConnection = $this->prepareAMQPConnection();
441
        // set up amqp channel
442
        $amqpChannel = $this->prepareAMQPChannel();
443
        $amqpChannel->expects($this->atLeastOnce())
444
            ->method('getChannelId')
445
            ->with()
446
            ->willReturn(true);
447
        $amqpChannel->expects($this->once())
448
            ->method('basic_consume')
449
            ->withAnyParameters()
450
            ->willReturn(true);
451
452
        // set up consumer
453
        $consumer = $this->getConsumer($amqpConnection, $amqpChannel);
454
        // disable autosetup fabric so we do not mock more objects
455
        $consumer->disableAutoSetupFabric();
456
        $consumer->setChannel($amqpChannel);
457
        $consumer->setTimeoutWait(20);
458
        $consumer->setIdleTimeout(10);
459
        $consumer->setIdleTimeoutExitCode(2);
460
461
        $amqpChannel->callbacks = array('idle_timeout_test');
0 ignored issues
show
Bug introduced by
Accessing callbacks on the interface PHPUnit\Framework\MockObject\MockObject suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
462
463
        $amqpChannel->expects($this->once())
464
            ->method('wait')
465
            ->with(null, false, 10)
466
            ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) {
467
                // simulate time passing by moving the last activity date time
468
                $consumer->setLastActivityDateTime(new \DateTime("-$waitTimeout seconds"));
469
                throw new AMQPTimeoutException();
470
            });
471
472
        $this->assertEquals(2, $consumer->consume(1));
473
    }
474
}
475