|
@@ -49,7 +49,7 @@ discard block |
|
|
block discarded – undo |
|
49
|
49
|
$amqpChannel = $this->prepareAMQPChannel(); |
|
50
|
50
|
$consumer = $this->getConsumer($amqpConnection, $amqpChannel); |
|
51
|
51
|
|
|
52
|
|
- $callbackFunction = function () use ($processFlag) { |
|
|
52
|
+ $callbackFunction = function() use ($processFlag) { |
|
53
|
53
|
return $processFlag; |
|
54
|
54
|
}; // Create a callback function with a return value set by the data provider. |
|
55
|
55
|
$consumer->setCallback($callbackFunction); |
|
@@ -62,14 +62,14 @@ discard block |
|
|
block discarded – undo |
|
62
|
62
|
if ($expectedMethod) { |
|
63
|
63
|
$amqpChannel->expects($this->any()) |
|
64
|
64
|
->method('basic_reject') |
|
65
|
|
- ->will($this->returnCallback(function ($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue) { |
|
|
65
|
+ ->will($this->returnCallback(function($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue) { |
|
66
|
66
|
Assert::assertSame($expectedMethod, 'basic_reject'); // Check if this function should be called. |
|
67
|
67
|
Assert::assertSame($requeue, $expectedRequeue); // Check if the message should be requeued. |
|
68
|
68
|
})); |
|
69
|
69
|
|
|
70
|
70
|
$amqpChannel->expects($this->any()) |
|
71
|
71
|
->method('basic_ack') |
|
72
|
|
- ->will($this->returnCallback(function ($delivery_tag) use ($expectedMethod) { |
|
|
72
|
+ ->will($this->returnCallback(function($delivery_tag) use ($expectedMethod) { |
|
73
|
73
|
Assert::assertSame($expectedMethod, 'basic_ack'); // Check if this function should be called. |
|
74
|
74
|
})); |
|
75
|
75
|
} else { |
|
@@ -174,7 +174,7 @@ discard block |
|
|
block discarded – undo |
|
174
|
174
|
->with(null, false, $consumer->getIdleTimeout()) |
|
175
|
175
|
->will( |
|
176
|
176
|
$this->returnCallback( |
|
177
|
|
- function () use ($amqpChannel) { |
|
|
177
|
+ function() use ($amqpChannel) { |
|
178
|
178
|
/** remove an element on each loop like ... simulate an ACK */ |
|
179
|
179
|
array_splice($amqpChannel->callbacks, 0, 1); |
|
180
|
180
|
}) |
|
@@ -220,7 +220,7 @@ discard block |
|
|
block discarded – undo |
|
220
|
220
|
$amqpChannel->expects($this->exactly(1)) |
|
221
|
221
|
->method('wait') |
|
222
|
222
|
->with(null, false, $consumer->getIdleTimeout()) |
|
223
|
|
- ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
|
223
|
+ ->willReturnCallback(function($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
224
|
224
|
// simulate time passing by moving the last activity date time |
|
225
|
225
|
$consumer->setLastActivityDateTime(new \DateTime("-$waitTimeout seconds")); |
|
226
|
226
|
throw new AMQPTimeoutException(); |
|
@@ -255,7 +255,7 @@ discard block |
|
|
block discarded – undo |
|
255
|
255
|
$amqpChannel->expects($this->exactly(2)) |
|
256
|
256
|
->method('wait') |
|
257
|
257
|
->with(null, false, $consumer->getIdleTimeout()) |
|
258
|
|
- ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
|
258
|
+ ->willReturnCallback(function($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
259
|
259
|
// simulate time passing by moving the last activity date time |
|
260
|
260
|
$consumer->setLastActivityDateTime(new \DateTime("-$waitTimeout seconds")); |
|
261
|
261
|
throw new AMQPTimeoutException(); |
|
@@ -268,7 +268,7 @@ discard block |
|
|
block discarded – undo |
|
268
|
268
|
$eventDispatcher->expects($this->at(1)) |
|
269
|
269
|
->method('dispatch') |
|
270
|
270
|
->with($this->isInstanceOf(OnIdleEvent::class), OnIdleEvent::NAME) |
|
271
|
|
- ->willReturnCallback(function (OnIdleEvent $event, $eventName) { |
|
|
271
|
+ ->willReturnCallback(function(OnIdleEvent $event, $eventName) { |
|
272
|
272
|
$event->setForceStop(false); |
|
273
|
273
|
|
|
274
|
274
|
return $event; |
|
@@ -277,7 +277,7 @@ discard block |
|
|
block discarded – undo |
|
277
|
277
|
$eventDispatcher->expects($this->at(3)) |
|
278
|
278
|
->method('dispatch') |
|
279
|
279
|
->with($this->isInstanceOf(OnIdleEvent::class), OnIdleEvent::NAME) |
|
280
|
|
- ->willReturnCallback(function (OnIdleEvent $event, $eventName) { |
|
|
280
|
+ ->willReturnCallback(function(OnIdleEvent $event, $eventName) { |
|
281
|
281
|
$event->setForceStop(true); |
|
282
|
282
|
|
|
283
|
283
|
return $event; |
|
@@ -316,7 +316,7 @@ discard block |
|
|
block discarded – undo |
|
316
|
316
|
|
|
317
|
317
|
$amqpChannel->expects($this->exactly(1)) |
|
318
|
318
|
->method('wait') |
|
319
|
|
- ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
|
319
|
+ ->willReturnCallback(function($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
320
|
320
|
// simulate time passing by moving the max execution date time |
|
321
|
321
|
$consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($waitTimeout * -1); |
|
322
|
322
|
throw new AMQPTimeoutException(); |
|
@@ -383,8 +383,8 @@ discard block |
|
|
block discarded – undo |
|
383
|
383
|
|
|
384
|
384
|
$amqpChannel->expects($this->exactly(2)) |
|
385
|
385
|
->method('wait') |
|
386
|
|
- ->with(null, false, $this->LessThanOrEqual($consumer->getTimeoutWait()) ) |
|
387
|
|
- ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
|
386
|
+ ->with(null, false, $this->LessThanOrEqual($consumer->getTimeoutWait())) |
|
|
387
|
+ ->willReturnCallback(function($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
388
|
388
|
// ensure max execution date time "counts down" |
|
389
|
389
|
$consumer->setGracefulMaxExecutionDateTime( |
|
390
|
390
|
$consumer->getGracefulMaxExecutionDateTime()->modify("-$waitTimeout seconds") |
|
@@ -425,7 +425,7 @@ discard block |
|
|
block discarded – undo |
|
425
|
425
|
$amqpChannel->expects($this->once()) |
|
426
|
426
|
->method('wait') |
|
427
|
427
|
->with(null, false, $consumer->getGracefulMaxExecutionDateTime()->diff(new \DateTime())->s) |
|
428
|
|
- ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
|
428
|
+ ->willReturnCallback(function($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
429
|
429
|
// simulate time passing by moving the max execution date time |
|
430
|
430
|
$consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($waitTimeout * -1); |
|
431
|
431
|
throw new AMQPTimeoutException(); |
|
@@ -463,7 +463,7 @@ discard block |
|
|
block discarded – undo |
|
463
|
463
|
$amqpChannel->expects($this->once()) |
|
464
|
464
|
->method('wait') |
|
465
|
465
|
->with(null, false, 10) |
|
466
|
|
- ->willReturnCallback(function ($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
|
466
|
+ ->willReturnCallback(function($allowedMethods, $nonBlocking, $waitTimeout) use ($consumer) { |
|
467
|
467
|
// simulate time passing by moving the last activity date time |
|
468
|
468
|
$consumer->setLastActivityDateTime(new \DateTime("-$waitTimeout seconds")); |
|
469
|
469
|
throw new AMQPTimeoutException(); |