Passed
Push — 1.x ( cf2ec7...97fb25 )
by Kevin
01:29
created

serialization_problem_assertions()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 4
c 1
b 0
f 1
nc 1
nop 0
dl 0
loc 10
rs 10
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Tests;
4
5
use PHPUnit\Framework\AssertionFailedError;
6
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\HttpKernel\KernelInterface;
9
use Symfony\Component\Messenger\Envelope;
10
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
11
use Symfony\Component\Messenger\MessageBusInterface;
12
use Symfony\Component\Messenger\Stamp\DelayStamp;
13
use Symfony\Component\Messenger\Stamp\SerializerStamp;
14
use Zenstruck\Assert;
15
use Zenstruck\Messenger\Test\InteractsWithMessenger;
16
use Zenstruck\Messenger\Test\TestEnvelope;
17
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA;
18
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageAHandler;
19
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageB;
20
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageBHandler;
21
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageC;
22
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageD;
23
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageE;
24
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageF;
25
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageG;
26
use Zenstruck\Messenger\Test\Tests\Fixture\NoBundleKernel;
27
28
/**
29
 * @author Kevin Bond <[email protected]>
30
 */
31
final class InteractsWithMessengerTest extends WebTestCase
32
{
33
    use InteractsWithMessenger;
34
35
    /**
36
     * @test
37
     */
38
    public function can_interact_with_queue(): void
39
    {
40
        self::bootKernel();
41
42
        $this->messenger()->queue()->assertEmpty();
43
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
44
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
45
46
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
47
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
48
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
49
50
        $this->messenger()->queue()->assertCount(3);
51
        $this->messenger()->queue()->assertContains(MessageA::class);
52
        $this->messenger()->queue()->assertContains(MessageA::class, 2);
53
        $this->messenger()->queue()->assertContains(MessageB::class);
54
        $this->messenger()->queue()->assertContains(MessageB::class, 1);
55
        $this->messenger()->queue()->assertNotContains(MessageC::class);
56
        $this->messenger()->queue()->assertContains(MessageC::class, 0);
57
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
58
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
59
60
        $this->messenger()->process(2);
61
62
        $this->messenger()->queue()->assertCount(1);
63
        $this->messenger()->queue()->assertContains(MessageA::class, 1);
64
        $this->messenger()->queue()->assertNotContains(MessageB::class);
65
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
66
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
67
68
        $this->messenger()->process();
69
70
        $this->messenger()->queue()->assertEmpty();
71
        $this->messenger()->queue()->assertNotContains(MessageA::class);
72
        $this->messenger()->queue()->assertNotContains(MessageB::class);
73
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
74
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
75
    }
76
77
    /**
78
     * @test
79
     */
80
    public function can_use_envelope_collection_back(): void
81
    {
82
        self::bootKernel();
83
84
        $this->messenger()
85
            ->queue()->assertEmpty()->back()
86
            ->dispatched()->assertEmpty()->back()
87
            ->acknowledged()->assertEmpty()->back()
88
            ->rejected()->assertEmpty()
89
        ;
90
91
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
92
93
        $this->messenger()
94
            ->queue()->assertCount(1)->back()
95
            ->dispatched()->assertCount(1)->back()
96
            ->acknowledged()->assertEmpty()->back()
97
            ->rejected()->assertEmpty()->back()
98
            ->process()
99
            ->queue()->assertEmpty()->back()
100
            ->dispatched()->assertCount(1)->back()
101
            ->acknowledged()->assertCount(1)->back()
102
            ->rejected()->assertEmpty()->back()
103
        ;
104
    }
105
106
    /**
107
     * @test
108
     */
109
    public function can_disable_intercept(): void
110
    {
111
        self::bootKernel();
112
113
        $this->messenger()->unblock();
114
115
        $this->messenger()->queue()->assertEmpty();
116
        $this->messenger()->acknowledged()->assertEmpty();
117
        $this->messenger()->dispatched()->assertEmpty();
118
119
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
120
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
121
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
122
123
        $this->messenger()->queue()->assertEmpty();
124
        $this->messenger()->queue()->assertNotContains(MessageA::class);
125
        $this->messenger()->dispatched()->assertCount(3);
126
        $this->messenger()->dispatched()->assertContains(MessageA::class, 2);
127
        $this->messenger()->dispatched()->assertContains(MessageB::class, 1);
128
        $this->messenger()->acknowledged()->assertCount(3);
129
        $this->messenger()->acknowledged()->assertContains(MessageA::class, 2);
130
        $this->messenger()->acknowledged()->assertContains(MessageB::class, 1);
131
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
132
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
133
    }
134
135
    /**
136
     * @test
137
     */
138
    public function disabling_intercept_with_items_on_queue_processes_all(): void
139
    {
140
        self::bootKernel();
141
142
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
143
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
144
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
145
146
        $this->messenger()->queue()->assertCount(3);
147
148
        $this->messenger()->process();
149
150
        $this->messenger()->queue()->assertEmpty();
151
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
152
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
153
    }
154
155
    /**
156
     * @test
157
     */
158
    public function unblocking_processes_existing_messages_on_queue(): void
159
    {
160
        self::bootKernel();
161
162
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
163
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
164
165
        $this->messenger()->queue()->assertCount(2);
166
        $this->messenger()->acknowledged()->assertEmpty();
167
168
        $this->messenger()->unblock();
169
170
        $this->messenger()->queue()->assertEmpty();
171
        $this->messenger()->acknowledged()->assertCount(2);
172
    }
173
174
    /**
175
     * @test
176
     */
177
    public function can_access_envelope_collection_items_via_first(): void
178
    {
179
        self::bootKernel();
180
181
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
182
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
183
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA(true));
184
185
        $this->messenger()->queue()->assertCount(3);
186
187
        $this->assertSame($m1, $this->messenger()->queue()->first()->getMessage());
0 ignored issues
show
Bug introduced by
The method getMessage() does not exist on Zenstruck\Messenger\Test\TestEnvelope. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

187
        $this->assertSame($m1, $this->messenger()->queue()->first()->/** @scrutinizer ignore-call */ getMessage());
Loading history...
188
        $this->assertSame($m2, $this->messenger()->queue()->first(MessageB::class)->getMessage());
189
        $this->assertSame($m3, $this->messenger()->queue()->first(fn(Envelope $e) => $e->getMessage()->fail)->getMessage());
190
        $this->assertSame($m3, $this->messenger()->queue()->first(fn($e) => $e->getMessage()->fail)->getMessage());
191
        $this->assertSame($m3, $this->messenger()->queue()->first(fn(MessageA $m) => $m->fail)->getMessage());
192
    }
193
194
    /**
195
     * @test
196
     */
197
    public function envelope_collection_first_throws_exception_if_no_match(): void
198
    {
199
        self::bootKernel();
200
201
        $this->expectException(\RuntimeException::class);
202
203
        $this->messenger()->queue()->first();
204
    }
205
206
    /**
207
     * @test
208
     */
209
    public function can_make_stamp_assertions_on_test_envelope(): void
210
    {
211
        self::bootKernel();
212
213
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(), [new DelayStamp(1000)]);
214
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
215
216
        $this->messenger()->queue()->first()->assertHasStamp(DelayStamp::class);
217
        $this->messenger()->queue()->first(MessageB::class)->assertNotHasStamp(DelayStamp::class);
218
219
        Assert::that(fn() => $this->messenger()->queue()->first()->assertHasStamp(SerializerStamp::class))
220
            ->throws(AssertionFailedError::class, \sprintf('Expected to find stamp "%s" but did not.', SerializerStamp::class))
221
        ;
222
    }
223
224
    /**
225
     * @test
226
     */
227
    public function cannot_access_queue_if_none_registered(): void
228
    {
229
        self::bootKernel(['environment' => 'test']);
230
231
        $this->expectException(\LogicException::class);
232
        $this->expectExceptionMessage('No transports registered');
233
234
        $this->messenger();
235
    }
236
237
    /**
238
     * @test
239
     */
240
    public function accessing_transport_boots_kernel_if_not_yet_booted(): void
241
    {
242
        $this->messenger()->queue()->assertEmpty();
243
    }
244
245
    /**
246
     * @test
247
     */
248
    public function can_interact_with_multiple_queues(): void
249
    {
250
        self::bootKernel(['environment' => 'multi_transport']);
251
252
        $this->messenger('async1')->queue()->assertEmpty();
253
        $this->messenger('async2')->queue()->assertEmpty();
254
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
255
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
256
257
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
258
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
259
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
260
261
        $this->messenger('async1')->queue()->assertCount(2);
262
        $this->messenger('async2')->queue()->assertEmpty();
263
        $this->messenger('async1')->queue()->assertContains(MessageA::class);
264
        $this->messenger('async1')->queue()->assertContains(MessageA::class, 2);
265
        $this->messenger('async2')->queue()->assertNotContains(MessageB::class);
266
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
267
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
268
269
        $this->messenger('async1')->process(1);
270
271
        $this->messenger('async1')->queue()->assertCount(1);
272
        $this->messenger('async1')->queue()->assertContains(MessageA::class, 1);
273
        $this->messenger('async2')->queue()->assertNotContains(MessageB::class);
274
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
275
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
276
277
        $this->messenger('async1')->process();
278
279
        $this->messenger('async1')->queue()->assertEmpty();
280
        $this->messenger('async2')->queue()->assertEmpty();
281
        $this->messenger('async2')->acknowledged()->assertCount(1);
282
        $this->messenger('async2')->acknowledged()->assertContains(MessageB::class, 1);
283
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
284
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
285
    }
286
287
    /**
288
     * @test
289
     */
290
    public function can_enable_intercept(): void
291
    {
292
        self::bootKernel(['environment' => 'multi_transport']);
293
294
        $this->messenger('async2')->intercept();
295
296
        $this->messenger('async2')->queue()->assertEmpty();
297
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
298
299
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
300
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
301
302
        $this->messenger('async2')->queue()->assertCount(2);
303
    }
304
305
    /**
306
     * @test
307
     */
308
    public function cannot_access_queue_that_does_not_exist(): void
309
    {
310
        self::bootKernel(['environment' => 'multi_transport']);
311
312
        $this->expectException(\InvalidArgumentException::class);
313
        $this->expectExceptionMessage('"invalid" not registered');
314
315
        $this->messenger('invalid');
316
    }
317
318
    /**
319
     * @test
320
     */
321
    public function cannot_access_queue_that_is_not_test_transport(): void
322
    {
323
        self::bootKernel(['environment' => 'multi_transport']);
324
325
        $this->expectException(\LogicException::class);
326
        $this->expectExceptionMessage('Transport "async3" needs to be set to "test://" in your test config to use this feature.');
327
328
        $this->messenger('async3');
329
    }
330
331
    /**
332
     * @test
333
     */
334
    public function queue_name_is_required_if_using_multiple_transports(): void
335
    {
336
        self::bootKernel(['environment' => 'multi_transport']);
337
338
        $this->expectException(\LogicException::class);
339
        $this->expectExceptionMessage('Multiple transports are registered (async1, async2, async3), you must specify a name.');
340
341
        $this->messenger();
342
    }
343
344
    /**
345
     * @test
346
     */
347
    public function can_access_message_objects_on_queue(): void
348
    {
349
        self::bootKernel();
350
351
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
352
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
353
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA());
354
355
        $this->assertSame([$m1, $m2, $m3], $this->messenger()->queue()->messages());
356
        $this->assertSame([$m1, $m3], $this->messenger()->queue()->messages(MessageA::class));
357
        $this->assertSame([$m2], $this->messenger()->queue()->messages(MessageB::class));
358
    }
359
360
    /**
361
     * @test
362
     */
363
    public function can_access_envelopes_on_envelope_collection(): void
364
    {
365
        self::bootKernel();
366
367
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
368
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
369
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA());
370
371
        $messages = \array_map(fn(TestEnvelope $envelope) => $envelope->getMessage(), $this->messenger()->queue()->all());
372
        $messagesFromIterator = \array_map(fn(TestEnvelope $envelope) => $envelope->getMessage(), \iterator_to_array($this->messenger()->queue()));
373
374
        $this->assertSame([$m1, $m2, $m3], $messages);
375
        $this->assertSame([$m1, $m2, $m3], $messagesFromIterator);
376
    }
377
378
    /**
379
     * @test
380
     */
381
    public function can_access_sent_acknowledged_and_rejected(): void
382
    {
383
        self::bootKernel();
384
385
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA(true));
386
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
387
388
        $this->assertCount(2, $this->messenger()->queue());
389
        $this->assertCount(2, $this->messenger()->dispatched());
390
        $this->assertCount(0, $this->messenger()->acknowledged());
391
        $this->assertCount(0, $this->messenger()->rejected());
392
393
        $this->messenger()->process();
394
395
        $this->assertCount(0, $this->messenger()->queue());
396
        $this->assertCount(2, $this->messenger()->dispatched());
397
        $this->assertCount(1, $this->messenger()->acknowledged());
398
        $this->assertCount(1, $this->messenger()->rejected());
399
    }
400
401
    /**
402
     * @test
403
     */
404
    public function cannot_access_queue_if_bundle_not_enabled(): void
405
    {
406
        self::$class = NoBundleKernel::class;
407
        self::bootKernel(['environment' => 'no_bundle']);
408
        self::$class = null;
409
410
        $this->expectException(\LogicException::class);
411
        $this->expectExceptionMessage('Cannot access transport - is ZenstruckMessengerTestBundle enabled in your test environment?');
412
413
        $this->messenger();
414
    }
415
416
    /**
417
     * @test
418
     */
419
    public function can_configure_throwing_exceptions(): void
420
    {
421
        self::bootKernel();
422
423
        $this->messenger()->throwExceptions();
424
425
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
426
427
        $this->expectException(\RuntimeException::class);
428
        $this->expectExceptionMessage('handling failed...');
429
430
        $this->messenger()->process();
431
    }
432
433
    /**
434
     * @test
435
     */
436
    public function can_configure_throwing_exceptions_with_intercept_disabled(): void
437
    {
438
        self::bootKernel();
439
440
        $this->messenger()->throwExceptions()->unblock();
441
442
        $this->expectException(\RuntimeException::class);
443
        $this->expectExceptionMessage('handling failed...');
444
445
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
446
    }
447
448
    /**
449
     * @test
450
     */
451
    public function can_disable_exception_catching_in_transport_config(): void
452
    {
453
        self::bootKernel(['environment' => 'multi_transport']);
454
455
        $this->expectException(\RuntimeException::class);
456
        $this->expectExceptionMessage('handling failed...');
457
458
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB(true));
459
    }
460
461
    /**
462
     * @test
463
     */
464
    public function can_re_enable_exception_catching_if_disabled_in_transport_config(): void
465
    {
466
        self::bootKernel(['environment' => 'multi_transport']);
467
468
        $this->messenger('async2')->catchExceptions();
469
470
        $this->messenger('async2')->rejected()->assertEmpty();
471
472
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB(true));
473
474
        $this->messenger('async2')->rejected()->assertCount(1);
475
    }
476
477
    /**
478
     * @test
479
     */
480
    public function transport_data_is_persisted_between_requests_and_kernel_shutdown(): void
481
    {
482
        self::bootKernel();
483
484
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
485
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
486
487
        $this->messenger()->queue()->assertCount(2);
488
489
        self::ensureKernelShutdown();
490
        self::bootKernel();
491
492
        $this->messenger()->queue()->assertCount(2);
493
494
        $this->messenger()->process();
495
496
        self::ensureKernelShutdown();
497
        self::bootKernel();
498
499
        $this->messenger()->queue()->assertEmpty();
500
        $this->messenger()->dispatched()->assertCount(2);
501
        $this->messenger()->acknowledged()->assertCount(1);
502
        $this->messenger()->rejected()->assertCount(1);
503
504
        self::ensureKernelShutdown();
505
506
        $client = self::createClient();
507
508
        $client->request('GET', '/dispatch');
509
510
        $this->messenger()->queue()->assertCount(1);
511
        $this->messenger()->dispatched()->assertCount(3);
512
        $this->messenger()->acknowledged()->assertCount(1);
513
        $this->messenger()->rejected()->assertCount(1);
514
515
        $client->request('GET', '/dispatch');
516
517
        $this->messenger()->queue()->assertCount(2);
518
        $this->messenger()->dispatched()->assertCount(4);
519
        $this->messenger()->acknowledged()->assertCount(1);
520
        $this->messenger()->rejected()->assertCount(1);
521
    }
522
523
    /**
524
     * @test
525
     */
526
    public function can_reset_transport_data(): void
527
    {
528
        self::bootKernel();
529
530
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
531
532
        $this->messenger()->queue()->assertNotEmpty();
533
534
        $this->messenger()->reset();
535
536
        $this->messenger()->queue()->assertEmpty();
537
    }
538
539
    /**
540
     * @test
541
     */
542
    public function disabling_intercept_is_remembered_between_kernel_reboots(): void
543
    {
544
        self::bootKernel();
545
546
        $this->messenger()->unblock();
547
548
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
549
550
        $this->messenger()->queue()->assertEmpty();
551
        $this->messenger()->dispatched()->assertCount(1);
552
553
        self::ensureKernelShutdown();
554
        self::bootKernel();
555
556
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
557
558
        $this->messenger()->queue()->assertEmpty();
559
        $this->messenger()->dispatched()->assertCount(2);
560
    }
561
562
    /**
563
     * @test
564
     */
565
    public function throwing_exceptions_is_remembered_between_kernel_reboots(): void
566
    {
567
        self::bootKernel();
568
569
        $this->messenger()->throwExceptions();
570
571
        self::ensureKernelShutdown();
572
        self::bootKernel();
573
574
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
575
576
        $this->expectException(\RuntimeException::class);
577
        $this->expectErrorMessage('handling failed...');
578
579
        $this->messenger()->process();
580
    }
581
582
    /**
583
     * @test
584
     */
585
    public function can_manually_send_message_to_transport_and_process(): void
586
    {
587
        self::bootKernel();
588
589
        $this->messenger()->queue()->assertEmpty();
590
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
591
592
        $this->messenger()->send(Envelope::wrap(new MessageA()));
593
594
        $this->messenger()->queue()->assertCount(1);
595
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
596
597
        $this->messenger()->process();
598
599
        $this->messenger()->queue()->assertEmpty();
600
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
601
    }
602
603
    /**
604
     * @test
605
     */
606
    public function process_all_is_recursive(): void
607
    {
608
        self::bootKernel();
609
610
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
611
612
        $this->messenger()->queue()->assertCount(1);
613
        $this->messenger()->queue()->assertContains(MessageD::class, 1);
614
615
        $this->messenger()->process();
616
617
        $this->messenger()->queue()->assertEmpty();
618
        $this->messenger()->dispatched()->assertCount(3);
619
        $this->messenger()->acknowledged()->assertCount(3);
620
        $this->messenger()->acknowledged()->assertContains(MessageD::class, 1);
621
        $this->messenger()->acknowledged()->assertContains(MessageE::class, 1);
622
        $this->messenger()->acknowledged()->assertContains(MessageF::class, 1);
623
    }
624
625
    /**
626
     * @test
627
     */
628
    public function process_x_messages_is_recursive(): void
629
    {
630
        self::bootKernel();
631
632
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
633
634
        $this->messenger()->queue()->assertCount(1);
635
        $this->messenger()->queue()->assertContains(MessageD::class, 1);
636
637
        $this->messenger()->process(1);
638
639
        $this->messenger()->queue()->assertCount(1);
640
        $this->messenger()->queue()->assertContains(MessageE::class, 1);
641
        $this->messenger()->acknowledged()->assertCount(1);
642
        $this->messenger()->acknowledged()->assertContains(MessageD::class, 1);
643
644
        $this->messenger()->process(2);
645
646
        $this->messenger()->queue()->assertEmpty();
647
        $this->messenger()->acknowledged()->assertCount(3);
648
        $this->messenger()->acknowledged()->assertContains(MessageE::class, 1);
649
        $this->messenger()->acknowledged()->assertContains(MessageF::class, 1);
650
    }
651
652
    /**
653
     * @test
654
     */
655
    public function process_x_recursive_when_intercept_disabled(): void
656
    {
657
        self::bootKernel();
658
659
        $this->messenger()->unblock();
660
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
661
662
        $this->messenger()->acknowledged()->assertCount(3);
663
        $this->messenger()->acknowledged()->assertContains(MessageD::class, 1);
664
        $this->messenger()->acknowledged()->assertContains(MessageE::class, 1);
665
        $this->messenger()->acknowledged()->assertContains(MessageF::class, 1);
666
    }
667
668
    /**
669
     * @test
670
     */
671
    public function fails_if_trying_to_process_more_messages_than_can_be_processed(): void
672
    {
673
        self::bootKernel();
674
675
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
676
677
        Assert::that(fn() => $this->messenger()->process(2))->throws(function(AssertionFailedError $e) {
678
            $this->assertStringContainsString('Expected to process 2 messages but only processed 1.', $e->getMessage());
679
            $this->messenger()->queue()->assertEmpty();
680
            $this->messenger()->acknowledged()->assertContains(MessageA::class, 1);
681
        });
682
    }
683
684
    /**
685
     * @test
686
     */
687
    public function process_or_fail_processes_messages(): void
688
    {
689
        self::bootKernel();
690
691
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
692
693
        $this->messenger()->queue()->assertCount(1);
694
        $this->messenger()->queue()->assertContains(MessageA::class, 1);
695
696
        $this->messenger()->processOrFail();
697
698
        $this->messenger()->queue()->assertEmpty();
699
        $this->messenger()->acknowledged()->assertCount(1);
700
        $this->messenger()->acknowledged()->assertContains(MessageA::class, 1);
701
    }
702
703
    /**
704
     * @test
705
     */
706
    public function process_or_fail_fails_if_no_messages_on_queue(): void
707
    {
708
        self::bootKernel();
709
710
        Assert::that(fn() => $this->messenger()->processOrFail())
711
            ->throws(AssertionFailedError::class, 'No messages to process.')
712
        ;
713
    }
714
715
    /**
716
     * @test
717
     */
718
    public function envelope_collection_assertions(): void
719
    {
720
        self::bootKernel();
721
722
        Assert::that(fn() => $this->messenger()->dispatched()->assertCount(2))
723
            ->throws(AssertionFailedError::class, 'Expected 2 messages but 0 messages found.')
724
        ;
725
        Assert::that(fn() => $this->messenger()->dispatched()->assertContains(MessageA::class))
726
            ->throws(AssertionFailedError::class, \sprintf('Message "%s" not found.', MessageA::class))
727
        ;
728
        Assert::that(fn() => $this->messenger()->dispatched()->assertNotEmpty())
729
            ->throws(AssertionFailedError::class, 'Expected some messages but found none.')
730
        ;
731
732
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
733
734
        Assert::that(fn() => $this->messenger()->dispatched()->assertEmpty())
735
            ->throws(AssertionFailedError::class, 'Expected 0 messages but 1 messages found.')
736
        ;
737
        Assert::that(fn() => $this->messenger()->dispatched()->assertContains(MessageA::class, 2))
738
            ->throws(AssertionFailedError::class, \sprintf('Expected to find "%s" 2 times but found 1 times.', MessageA::class))
739
        ;
740
        Assert::that(fn() => $this->messenger()->dispatched()->assertNotContains(MessageA::class))
741
            ->throws(AssertionFailedError::class, \sprintf('Found message "%s" but should not.', MessageA::class))
742
        ;
743
    }
744
745
    /**
746
     * @test
747
     */
748
    public function messenger_worker_events_are_dispatched_when_processing(): void
749
    {
750
        $messages = [];
751
752
        self::bootKernel();
753
754
        self::getContainer()->get('event_dispatcher')->addListener(
755
            WorkerMessageHandledEvent::class,
756
            static function(WorkerMessageHandledEvent $event) use (&$messages) {
757
                $messages[] = $event->getEnvelope()->getMessage();
758
            }
759
        );
760
761
        self::getContainer()->get(MessageBusInterface::class)->dispatch($message = new MessageA());
762
763
        $this->messenger()->process();
764
765
        $this->assertCount(1, $messages);
766
        $this->assertSame($message, $messages[0]);
767
    }
768
769
    /**
770
     * @test
771
     */
772
    public function serialization_problem_assertions(): void
773
    {
774
        self::bootKernel(['environment' => 'multi_transport']);
775
776
        // "MessageG" is not serializable, but only transport async1 should catch this.
777
        Assert::that(fn() => $this->messenger('async1')->send(new Envelope(new MessageG())))
778
            ->throws(AssertionFailedError::class)
779
        ;
780
781
        Assert::run(fn() => $this->messenger('async2')->send(new Envelope(new MessageG())));
782
    }
783
784
    protected static function bootKernel(array $options = []): KernelInterface
785
    {
786
        return parent::bootKernel(\array_merge(['environment' => 'single_transport'], $options));
787
    }
788
789
    protected static function getContainer(): ContainerInterface
790
    {
791
        if (\method_exists(parent::class, 'getContainer')) {
792
            return parent::getContainer();
793
        }
794
795
        return self::$container;
796
    }
797
}
798