Passed
Pull Request — 1.x (#35)
by
unknown
01:33
created

unblocking_processes_existing_messages_on_queue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 14
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
eloc 8
c 2
b 0
f 0
nc 1
nop 0
dl 0
loc 14
rs 10
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Tests;
4
5
use PHPUnit\Framework\AssertionFailedError;
6
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\HttpKernel\KernelInterface;
9
use Symfony\Component\Messenger\Envelope;
10
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
11
use Symfony\Component\Messenger\MessageBusInterface;
12
use Symfony\Component\Messenger\Stamp\DelayStamp;
13
use Symfony\Component\Messenger\Stamp\SerializerStamp;
14
use Zenstruck\Assert;
15
use Zenstruck\Assert\AssertionFailed;
16
use Zenstruck\Messenger\Test\InteractsWithMessenger;
17
use Zenstruck\Messenger\Test\TestEnvelope;
18
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA;
19
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageAHandler;
20
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageB;
21
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageBHandler;
22
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageC;
23
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageD;
24
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageE;
25
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageF;
26
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageG;
27
use Zenstruck\Messenger\Test\Tests\Fixture\NoBundleKernel;
28
29
/**
30
 * @author Kevin Bond <[email protected]>
31
 */
32
final class InteractsWithMessengerTest extends WebTestCase
33
{
34
    use InteractsWithMessenger;
35
36
    /**
37
     * @test
38
     */
39
    public function can_interact_with_queue(): void
40
    {
41
        self::bootKernel();
42
43
        $this->messenger()->queue()->assertEmpty();
44
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
45
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
46
47
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
48
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
49
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
50
51
        $this->messenger()->queue()->assertCount(3);
52
        $this->messenger()->queue()->assertContains(MessageA::class);
53
        $this->messenger()->queue()->assertContains(MessageA::class, 2);
54
        $this->messenger()->queue()->assertContains(MessageB::class);
55
        $this->messenger()->queue()->assertContains(MessageB::class, 1);
56
        $this->messenger()->queue()->assertNotContains(MessageC::class);
57
        $this->messenger()->queue()->assertContains(MessageC::class, 0);
58
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
59
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
60
61
        $this->messenger()->process(2);
62
63
        $this->messenger()->queue()->assertCount(1);
64
        $this->messenger()->queue()->assertContains(MessageA::class, 1);
65
        $this->messenger()->queue()->assertNotContains(MessageB::class);
66
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
67
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
68
69
        $this->messenger()->process();
70
71
        $this->messenger()->queue()->assertEmpty();
72
        $this->messenger()->queue()->assertNotContains(MessageA::class);
73
        $this->messenger()->queue()->assertNotContains(MessageB::class);
74
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
75
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
76
    }
77
78
    /**
79
     * @test
80
     */
81
    public function can_use_envelope_collection_back(): void
82
    {
83
        self::bootKernel();
84
85
        $this->messenger()
86
            ->queue()->assertEmpty()->back()
87
            ->dispatched()->assertEmpty()->back()
88
            ->acknowledged()->assertEmpty()->back()
89
            ->rejected()->assertEmpty()
90
        ;
91
92
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
93
94
        $this->messenger()
95
            ->queue()->assertCount(1)->back()
96
            ->dispatched()->assertCount(1)->back()
97
            ->acknowledged()->assertEmpty()->back()
98
            ->rejected()->assertEmpty()->back()
99
            ->process()
100
            ->queue()->assertEmpty()->back()
101
            ->dispatched()->assertCount(1)->back()
102
            ->acknowledged()->assertCount(1)->back()
103
            ->rejected()->assertEmpty()->back()
104
        ;
105
    }
106
107
    /**
108
     * @test
109
     */
110
    public function can_disable_intercept(): void
111
    {
112
        self::bootKernel();
113
114
        $this->messenger()->unblock();
115
116
        $this->messenger()->queue()->assertEmpty();
117
        $this->messenger()->acknowledged()->assertEmpty();
118
        $this->messenger()->dispatched()->assertEmpty();
119
120
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
121
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
122
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
123
124
        $this->messenger()->queue()->assertEmpty();
125
        $this->messenger()->queue()->assertNotContains(MessageA::class);
126
        $this->messenger()->dispatched()->assertCount(3);
127
        $this->messenger()->dispatched()->assertContains(MessageA::class, 2);
128
        $this->messenger()->dispatched()->assertContains(MessageB::class, 1);
129
        $this->messenger()->acknowledged()->assertCount(3);
130
        $this->messenger()->acknowledged()->assertContains(MessageA::class, 2);
131
        $this->messenger()->acknowledged()->assertContains(MessageB::class, 1);
132
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
133
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
134
    }
135
136
    /**
137
     * @test
138
     */
139
    public function disabling_intercept_with_items_on_queue_processes_all(): void
140
    {
141
        self::bootKernel();
142
143
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
144
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
145
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
146
147
        $this->messenger()->queue()->assertCount(3);
148
149
        $this->messenger()->process();
150
151
        $this->messenger()->queue()->assertEmpty();
152
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
153
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
154
    }
155
156
    /**
157
     * @test
158
     */
159
    public function unblocking_processes_existing_messages_on_queue(): void
160
    {
161
        self::bootKernel();
162
163
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
164
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
165
166
        $this->messenger()->queue()->assertCount(2);
167
        $this->messenger()->acknowledged()->assertEmpty();
168
169
        $this->messenger()->unblock();
170
171
        $this->messenger()->queue()->assertEmpty();
172
        $this->messenger()->acknowledged()->assertCount(2);
173
    }
174
175
    /**
176
     * @test
177
     */
178
    public function can_access_envelope_collection_items_via_first(): void
179
    {
180
        self::bootKernel();
181
182
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
183
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
184
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA(true));
185
186
        $this->messenger()->queue()->assertCount(3);
187
188
        $this->assertSame($m1, $this->messenger()->queue()->first()->getMessage());
0 ignored issues
show
Bug introduced by
The method getMessage() does not exist on Zenstruck\Messenger\Test\TestEnvelope. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

188
        $this->assertSame($m1, $this->messenger()->queue()->first()->/** @scrutinizer ignore-call */ getMessage());
Loading history...
189
        $this->assertSame($m2, $this->messenger()->queue()->first(MessageB::class)->getMessage());
190
        $this->assertSame($m3, $this->messenger()->queue()->first(fn(Envelope $e) => $e->getMessage()->fail)->getMessage());
191
        $this->assertSame($m3, $this->messenger()->queue()->first(fn($e) => $e->getMessage()->fail)->getMessage());
192
        $this->assertSame($m3, $this->messenger()->queue()->first(fn(MessageA $m) => $m->fail)->getMessage());
193
    }
194
195
    /**
196
     * @test
197
     */
198
    public function envelope_collection_first_throws_exception_if_no_match(): void
199
    {
200
        self::bootKernel();
201
202
        $this->expectException(\RuntimeException::class);
203
204
        $this->messenger()->queue()->first();
205
    }
206
207
    /**
208
     * @test
209
     */
210
    public function can_make_stamp_assertions_on_test_envelope(): void
211
    {
212
        self::bootKernel();
213
214
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(), [new DelayStamp(1000)]);
215
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
216
217
        $this->messenger()->queue()->first()->assertHasStamp(DelayStamp::class);
218
        $this->messenger()->queue()->first(MessageB::class)->assertNotHasStamp(DelayStamp::class);
219
220
        Assert::that(fn() => $this->messenger()->queue()->first()->assertHasStamp(SerializerStamp::class))
221
            ->throws(AssertionFailedError::class, \sprintf('Expected to find stamp "%s" but did not.', SerializerStamp::class))
222
        ;
223
    }
224
225
    /**
226
     * @test
227
     */
228
    public function cannot_access_queue_if_none_registered(): void
229
    {
230
        self::bootKernel(['environment' => 'test']);
231
232
        $this->expectException(\LogicException::class);
233
        $this->expectExceptionMessage('No transports registered');
234
235
        $this->messenger();
236
    }
237
238
    /**
239
     * @test
240
     */
241
    public function accessing_transport_boots_kernel_if_not_yet_booted(): void
242
    {
243
        $this->messenger()->queue()->assertEmpty();
244
    }
245
246
    /**
247
     * @test
248
     */
249
    public function can_interact_with_multiple_queues(): void
250
    {
251
        self::bootKernel(['environment' => 'multi_transport']);
252
253
        $this->messenger('async1')->queue()->assertEmpty();
254
        $this->messenger('async2')->queue()->assertEmpty();
255
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
256
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
257
258
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
259
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
260
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
261
262
        $this->messenger('async1')->queue()->assertCount(2);
263
        $this->messenger('async2')->queue()->assertEmpty();
264
        $this->messenger('async1')->queue()->assertContains(MessageA::class);
265
        $this->messenger('async1')->queue()->assertContains(MessageA::class, 2);
266
        $this->messenger('async2')->queue()->assertNotContains(MessageB::class);
267
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
268
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
269
270
        $this->messenger('async1')->process(1);
271
272
        $this->messenger('async1')->queue()->assertCount(1);
273
        $this->messenger('async1')->queue()->assertContains(MessageA::class, 1);
274
        $this->messenger('async2')->queue()->assertNotContains(MessageB::class);
275
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
276
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
277
278
        $this->messenger('async1')->process();
279
280
        $this->messenger('async1')->queue()->assertEmpty();
281
        $this->messenger('async2')->queue()->assertEmpty();
282
        $this->messenger('async2')->acknowledged()->assertCount(1);
283
        $this->messenger('async2')->acknowledged()->assertContains(MessageB::class, 1);
284
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
285
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
286
    }
287
288
    /**
289
     * @test
290
     */
291
    public function can_enable_intercept(): void
292
    {
293
        self::bootKernel(['environment' => 'multi_transport']);
294
295
        $this->messenger('async2')->intercept();
296
297
        $this->messenger('async2')->queue()->assertEmpty();
298
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
299
300
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
301
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
302
303
        $this->messenger('async2')->queue()->assertCount(2);
304
    }
305
306
    /**
307
     * @test
308
     */
309
    public function cannot_access_queue_that_does_not_exist(): void
310
    {
311
        self::bootKernel(['environment' => 'multi_transport']);
312
313
        $this->expectException(\InvalidArgumentException::class);
314
        $this->expectExceptionMessage('"invalid" not registered');
315
316
        $this->messenger('invalid');
317
    }
318
319
    /**
320
     * @test
321
     */
322
    public function cannot_access_queue_that_is_not_test_transport(): void
323
    {
324
        self::bootKernel(['environment' => 'multi_transport']);
325
326
        $this->expectException(\LogicException::class);
327
        $this->expectExceptionMessage('Transport "async3" needs to be set to "test://" in your test config to use this feature.');
328
329
        $this->messenger('async3');
330
    }
331
332
    /**
333
     * @test
334
     */
335
    public function queue_name_is_required_if_using_multiple_transports(): void
336
    {
337
        self::bootKernel(['environment' => 'multi_transport']);
338
339
        $this->expectException(\LogicException::class);
340
        $this->expectExceptionMessage('Multiple transports are registered (async1, async2, async3), you must specify a name.');
341
342
        $this->messenger();
343
    }
344
345
    /**
346
     * @test
347
     */
348
    public function can_access_message_objects_on_queue(): void
349
    {
350
        self::bootKernel();
351
352
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
353
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
354
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA());
355
356
        $this->assertSame([$m1, $m2, $m3], $this->messenger()->queue()->messages());
357
        $this->assertSame([$m1, $m3], $this->messenger()->queue()->messages(MessageA::class));
358
        $this->assertSame([$m2], $this->messenger()->queue()->messages(MessageB::class));
359
    }
360
361
    /**
362
     * @test
363
     */
364
    public function can_access_envelopes_on_envelope_collection(): void
365
    {
366
        self::bootKernel();
367
368
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
369
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
370
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA());
371
372
        $messages = \array_map(fn(TestEnvelope $envelope) => $envelope->getMessage(), $this->messenger()->queue()->all());
373
        $messagesFromIterator = \array_map(fn(TestEnvelope $envelope) => $envelope->getMessage(), \iterator_to_array($this->messenger()->queue()));
374
375
        $this->assertSame([$m1, $m2, $m3], $messages);
376
        $this->assertSame([$m1, $m2, $m3], $messagesFromIterator);
377
    }
378
379
    /**
380
     * @test
381
     */
382
    public function can_access_sent_acknowledged_and_rejected(): void
383
    {
384
        self::bootKernel();
385
386
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA(true));
387
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
388
389
        $this->assertCount(2, $this->messenger()->queue());
390
        $this->assertCount(2, $this->messenger()->dispatched());
391
        $this->assertCount(0, $this->messenger()->acknowledged());
392
        $this->assertCount(0, $this->messenger()->rejected());
393
394
        $this->messenger()->process();
395
396
        $this->assertCount(0, $this->messenger()->queue());
397
        $this->assertCount(2, $this->messenger()->dispatched());
398
        $this->assertCount(1, $this->messenger()->acknowledged());
399
        $this->assertCount(1, $this->messenger()->rejected());
400
    }
401
402
    /**
403
     * @test
404
     */
405
    public function cannot_access_queue_if_bundle_not_enabled(): void
406
    {
407
        self::$class = NoBundleKernel::class;
408
        self::bootKernel(['environment' => 'no_bundle']);
409
        self::$class = null;
410
411
        $this->expectException(\LogicException::class);
412
        $this->expectExceptionMessage('Cannot access transport - is ZenstruckMessengerTestBundle enabled in your test environment?');
413
414
        $this->messenger();
415
    }
416
417
    /**
418
     * @test
419
     */
420
    public function can_configure_throwing_exceptions(): void
421
    {
422
        self::bootKernel();
423
424
        $this->messenger()->throwExceptions();
425
426
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
427
428
        $this->expectException(\RuntimeException::class);
429
        $this->expectExceptionMessage('handling failed...');
430
431
        $this->messenger()->process();
432
    }
433
434
    /**
435
     * @test
436
     */
437
    public function can_configure_throwing_exceptions_with_intercept_disabled(): void
438
    {
439
        self::bootKernel();
440
441
        $this->messenger()->throwExceptions()->unblock();
442
443
        $this->expectException(\RuntimeException::class);
444
        $this->expectExceptionMessage('handling failed...');
445
446
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
447
    }
448
449
    /**
450
     * @test
451
     */
452
    public function can_disable_exception_catching_in_transport_config(): void
453
    {
454
        self::bootKernel(['environment' => 'multi_transport']);
455
456
        $this->expectException(\RuntimeException::class);
457
        $this->expectExceptionMessage('handling failed...');
458
459
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB(true));
460
    }
461
462
    /**
463
     * @test
464
     */
465
    public function can_re_enable_exception_catching_if_disabled_in_transport_config(): void
466
    {
467
        self::bootKernel(['environment' => 'multi_transport']);
468
469
        $this->messenger('async2')->catchExceptions();
470
471
        $this->messenger('async2')->rejected()->assertEmpty();
472
473
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB(true));
474
475
        $this->messenger('async2')->rejected()->assertCount(1);
476
    }
477
478
    /**
479
     * @test
480
     */
481
    public function transport_data_is_persisted_between_requests_and_kernel_shutdown(): void
482
    {
483
        self::bootKernel();
484
485
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
486
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
487
488
        $this->messenger()->queue()->assertCount(2);
489
490
        self::ensureKernelShutdown();
491
        self::bootKernel();
492
493
        $this->messenger()->queue()->assertCount(2);
494
495
        $this->messenger()->process();
496
497
        self::ensureKernelShutdown();
498
        self::bootKernel();
499
500
        $this->messenger()->queue()->assertEmpty();
501
        $this->messenger()->dispatched()->assertCount(2);
502
        $this->messenger()->acknowledged()->assertCount(1);
503
        $this->messenger()->rejected()->assertCount(1);
504
505
        self::ensureKernelShutdown();
506
507
        $client = self::createClient();
508
509
        $client->request('GET', '/dispatch');
510
511
        $this->messenger()->queue()->assertCount(1);
512
        $this->messenger()->dispatched()->assertCount(3);
513
        $this->messenger()->acknowledged()->assertCount(1);
514
        $this->messenger()->rejected()->assertCount(1);
515
516
        $client->request('GET', '/dispatch');
517
518
        $this->messenger()->queue()->assertCount(2);
519
        $this->messenger()->dispatched()->assertCount(4);
520
        $this->messenger()->acknowledged()->assertCount(1);
521
        $this->messenger()->rejected()->assertCount(1);
522
    }
523
524
    /**
525
     * @test
526
     */
527
    public function can_reset_transport_data(): void
528
    {
529
        self::bootKernel();
530
531
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
532
533
        $this->messenger()->queue()->assertNotEmpty();
534
535
        $this->messenger()->reset();
536
537
        $this->messenger()->queue()->assertEmpty();
538
    }
539
540
    /**
541
     * @test
542
     */
543
    public function disabling_intercept_is_remembered_between_kernel_reboots(): void
544
    {
545
        self::bootKernel();
546
547
        $this->messenger()->unblock();
548
549
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
550
551
        $this->messenger()->queue()->assertEmpty();
552
        $this->messenger()->dispatched()->assertCount(1);
553
554
        self::ensureKernelShutdown();
555
        self::bootKernel();
556
557
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
558
559
        $this->messenger()->queue()->assertEmpty();
560
        $this->messenger()->dispatched()->assertCount(2);
561
    }
562
563
    /**
564
     * @test
565
     */
566
    public function throwing_exceptions_is_remembered_between_kernel_reboots(): void
567
    {
568
        self::bootKernel();
569
570
        $this->messenger()->throwExceptions();
571
572
        self::ensureKernelShutdown();
573
        self::bootKernel();
574
575
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
576
577
        $this->expectException(\RuntimeException::class);
578
        $this->expectErrorMessage('handling failed...');
579
580
        $this->messenger()->process();
581
    }
582
583
    /**
584
     * @test
585
     */
586
    public function can_manually_send_message_to_transport_and_process(): void
587
    {
588
        self::bootKernel();
589
590
        $this->messenger()->queue()->assertEmpty();
591
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
592
593
        $this->messenger()->send(Envelope::wrap(new MessageA()));
594
595
        $this->messenger()->queue()->assertCount(1);
596
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
597
598
        $this->messenger()->process();
599
600
        $this->messenger()->queue()->assertEmpty();
601
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
602
    }
603
604
    /**
605
     * @test
606
     */
607
    public function process_all_is_recursive(): void
608
    {
609
        self::bootKernel();
610
611
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
612
613
        $this->messenger()->queue()->assertCount(1);
614
        $this->messenger()->queue()->assertContains(MessageD::class, 1);
615
616
        $this->messenger()->process();
617
618
        $this->messenger()->queue()->assertEmpty();
619
        $this->messenger()->dispatched()->assertCount(3);
620
        $this->messenger()->acknowledged()->assertCount(3);
621
        $this->messenger()->acknowledged()->assertContains(MessageD::class, 1);
622
        $this->messenger()->acknowledged()->assertContains(MessageE::class, 1);
623
        $this->messenger()->acknowledged()->assertContains(MessageF::class, 1);
624
    }
625
626
    /**
627
     * @test
628
     */
629
    public function process_x_messages_is_recursive(): void
630
    {
631
        self::bootKernel();
632
633
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
634
635
        $this->messenger()->queue()->assertCount(1);
636
        $this->messenger()->queue()->assertContains(MessageD::class, 1);
637
638
        $this->messenger()->process(1);
639
640
        $this->messenger()->queue()->assertCount(1);
641
        $this->messenger()->queue()->assertContains(MessageE::class, 1);
642
        $this->messenger()->acknowledged()->assertCount(1);
643
        $this->messenger()->acknowledged()->assertContains(MessageD::class, 1);
644
645
        $this->messenger()->process(2);
646
647
        $this->messenger()->queue()->assertEmpty();
648
        $this->messenger()->acknowledged()->assertCount(3);
649
        $this->messenger()->acknowledged()->assertContains(MessageE::class, 1);
650
        $this->messenger()->acknowledged()->assertContains(MessageF::class, 1);
651
    }
652
653
    /**
654
     * @test
655
     */
656
    public function fails_if_trying_to_process_more_messages_than_can_be_processed(): void
657
    {
658
        self::bootKernel();
659
660
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
661
662
        Assert::that(fn() => $this->messenger()->process(2))->throws(function(AssertionFailedError $e) {
663
            $this->assertStringContainsString('Expected to process 2 messages but only processed 1.', $e->getMessage());
664
            $this->messenger()->queue()->assertEmpty();
665
            $this->messenger()->acknowledged()->assertContains(MessageA::class, 1);
666
        });
667
    }
668
669
    /**
670
     * @test
671
     */
672
    public function process_or_fail_processes_messages(): void
673
    {
674
        self::bootKernel();
675
676
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
677
678
        $this->messenger()->queue()->assertCount(1);
679
        $this->messenger()->queue()->assertContains(MessageA::class, 1);
680
681
        $this->messenger()->processOrFail();
682
683
        $this->messenger()->queue()->assertEmpty();
684
        $this->messenger()->acknowledged()->assertCount(1);
685
        $this->messenger()->acknowledged()->assertContains(MessageA::class, 1);
686
    }
687
688
    /**
689
     * @test
690
     */
691
    public function process_or_fail_fails_if_no_messages_on_queue(): void
692
    {
693
        self::bootKernel();
694
695
        Assert::that(fn() => $this->messenger()->processOrFail())
696
            ->throws(AssertionFailedError::class, 'No messages to process.')
697
        ;
698
    }
699
700
    /**
701
     * @test
702
     */
703
    public function envelope_collection_assertions(): void
704
    {
705
        self::bootKernel();
706
707
        Assert::that(fn() => $this->messenger()->dispatched()->assertCount(2))
708
            ->throws(AssertionFailedError::class, 'Expected 2 messages but 0 messages found.')
709
        ;
710
        Assert::that(fn() => $this->messenger()->dispatched()->assertContains(MessageA::class))
711
            ->throws(AssertionFailedError::class, \sprintf('Message "%s" not found.', MessageA::class))
712
        ;
713
        Assert::that(fn() => $this->messenger()->dispatched()->assertNotEmpty())
714
            ->throws(AssertionFailedError::class, 'Expected some messages but found none.')
715
        ;
716
717
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
718
719
        Assert::that(fn() => $this->messenger()->dispatched()->assertEmpty())
720
            ->throws(AssertionFailedError::class, 'Expected 0 messages but 1 messages found.')
721
        ;
722
        Assert::that(fn() => $this->messenger()->dispatched()->assertContains(MessageA::class, 2))
723
            ->throws(AssertionFailedError::class, \sprintf('Expected to find "%s" 2 times but found 1 times.', MessageA::class))
724
        ;
725
        Assert::that(fn() => $this->messenger()->dispatched()->assertNotContains(MessageA::class))
726
            ->throws(AssertionFailedError::class, \sprintf('Found message "%s" but should not.', MessageA::class))
727
        ;
728
    }
729
730
    /**
731
     * @test
732
     */
733
    public function messenger_worker_events_are_dispatched_when_processing(): void
734
    {
735
        $messages = [];
736
737
        self::bootKernel();
738
739
        self::getContainer()->get('event_dispatcher')->addListener(
740
            WorkerMessageHandledEvent::class,
741
            static function(WorkerMessageHandledEvent $event) use (&$messages) {
742
                $messages[] = $event->getEnvelope()->getMessage();
743
            }
744
        );
745
746
        self::getContainer()->get(MessageBusInterface::class)->dispatch($message = new MessageA());
747
748
        $this->messenger()->process();
749
750
        $this->assertCount(1, $messages);
751
        $this->assertSame($message, $messages[0]);
752
    }
753
754
    /**
755
     * @test
756
     */
757
    public function serialization_problem_assertions(): void
758
    {
759
        self::bootKernel(['environment' => 'multi_transport']);
760
761
        // "MessageG" is not serializable, but only transport async1 should catch this.
762
        Assert::that(fn() => $this->messenger('async1')->send(new Envelope(new MessageG())))
763
            ->throws(AssertionFailedError::class);
764
        
765
        Assert::run(fn() => $this->messenger('async2')->send(new Envelope(new MessageG())));
766
    }
767
768
    protected static function bootKernel(array $options = []): KernelInterface
769
    {
770
        return parent::bootKernel(\array_merge(['environment' => 'single_transport'], $options));
771
    }
772
773
    protected static function getContainer(): ContainerInterface
774
    {
775
        if (\method_exists(parent::class, 'getContainer')) {
776
            return parent::getContainer();
777
        }
778
779
        return self::$container;
780
    }
781
}
782