Passed
Push — 1.x ( 8aac8d...cf2ec7 )
by Kevin
02:15 queued 52s
created

process_x_recursive_when_intercept_disabled()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 7
c 1
b 0
f 0
nc 1
nop 0
dl 0
loc 11
rs 10
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Tests;
4
5
use PHPUnit\Framework\AssertionFailedError;
6
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\HttpKernel\KernelInterface;
9
use Symfony\Component\Messenger\Envelope;
10
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
11
use Symfony\Component\Messenger\MessageBusInterface;
12
use Symfony\Component\Messenger\Stamp\DelayStamp;
13
use Symfony\Component\Messenger\Stamp\SerializerStamp;
14
use Zenstruck\Assert;
15
use Zenstruck\Messenger\Test\InteractsWithMessenger;
16
use Zenstruck\Messenger\Test\TestEnvelope;
17
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA;
18
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageAHandler;
19
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageB;
20
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageBHandler;
21
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageC;
22
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageD;
23
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageE;
24
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageF;
25
use Zenstruck\Messenger\Test\Tests\Fixture\NoBundleKernel;
26
27
/**
28
 * @author Kevin Bond <[email protected]>
29
 */
30
final class InteractsWithMessengerTest extends WebTestCase
31
{
32
    use InteractsWithMessenger;
33
34
    /**
35
     * @test
36
     */
37
    public function can_interact_with_queue(): void
38
    {
39
        self::bootKernel();
40
41
        $this->messenger()->queue()->assertEmpty();
42
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
43
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
44
45
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
46
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
47
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
48
49
        $this->messenger()->queue()->assertCount(3);
50
        $this->messenger()->queue()->assertContains(MessageA::class);
51
        $this->messenger()->queue()->assertContains(MessageA::class, 2);
52
        $this->messenger()->queue()->assertContains(MessageB::class);
53
        $this->messenger()->queue()->assertContains(MessageB::class, 1);
54
        $this->messenger()->queue()->assertNotContains(MessageC::class);
55
        $this->messenger()->queue()->assertContains(MessageC::class, 0);
56
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
57
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
58
59
        $this->messenger()->process(2);
60
61
        $this->messenger()->queue()->assertCount(1);
62
        $this->messenger()->queue()->assertContains(MessageA::class, 1);
63
        $this->messenger()->queue()->assertNotContains(MessageB::class);
64
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
65
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
66
67
        $this->messenger()->process();
68
69
        $this->messenger()->queue()->assertEmpty();
70
        $this->messenger()->queue()->assertNotContains(MessageA::class);
71
        $this->messenger()->queue()->assertNotContains(MessageB::class);
72
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
73
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
74
    }
75
76
    /**
77
     * @test
78
     */
79
    public function can_use_envelope_collection_back(): void
80
    {
81
        self::bootKernel();
82
83
        $this->messenger()
84
            ->queue()->assertEmpty()->back()
85
            ->dispatched()->assertEmpty()->back()
86
            ->acknowledged()->assertEmpty()->back()
87
            ->rejected()->assertEmpty()
88
        ;
89
90
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
91
92
        $this->messenger()
93
            ->queue()->assertCount(1)->back()
94
            ->dispatched()->assertCount(1)->back()
95
            ->acknowledged()->assertEmpty()->back()
96
            ->rejected()->assertEmpty()->back()
97
            ->process()
98
            ->queue()->assertEmpty()->back()
99
            ->dispatched()->assertCount(1)->back()
100
            ->acknowledged()->assertCount(1)->back()
101
            ->rejected()->assertEmpty()->back()
102
        ;
103
    }
104
105
    /**
106
     * @test
107
     */
108
    public function can_disable_intercept(): void
109
    {
110
        self::bootKernel();
111
112
        $this->messenger()->unblock();
113
114
        $this->messenger()->queue()->assertEmpty();
115
        $this->messenger()->acknowledged()->assertEmpty();
116
        $this->messenger()->dispatched()->assertEmpty();
117
118
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
119
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
120
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
121
122
        $this->messenger()->queue()->assertEmpty();
123
        $this->messenger()->queue()->assertNotContains(MessageA::class);
124
        $this->messenger()->dispatched()->assertCount(3);
125
        $this->messenger()->dispatched()->assertContains(MessageA::class, 2);
126
        $this->messenger()->dispatched()->assertContains(MessageB::class, 1);
127
        $this->messenger()->acknowledged()->assertCount(3);
128
        $this->messenger()->acknowledged()->assertContains(MessageA::class, 2);
129
        $this->messenger()->acknowledged()->assertContains(MessageB::class, 1);
130
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
131
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
132
    }
133
134
    /**
135
     * @test
136
     */
137
    public function disabling_intercept_with_items_on_queue_processes_all(): void
138
    {
139
        self::bootKernel();
140
141
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
142
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
143
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
144
145
        $this->messenger()->queue()->assertCount(3);
146
147
        $this->messenger()->process();
148
149
        $this->messenger()->queue()->assertEmpty();
150
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
151
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
152
    }
153
154
    /**
155
     * @test
156
     */
157
    public function unblocking_processes_existing_messages_on_queue(): void
158
    {
159
        self::bootKernel();
160
161
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
162
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
163
164
        $this->messenger()->queue()->assertCount(2);
165
        $this->messenger()->acknowledged()->assertEmpty();
166
167
        $this->messenger()->unblock();
168
169
        $this->messenger()->queue()->assertEmpty();
170
        $this->messenger()->acknowledged()->assertCount(2);
171
    }
172
173
    /**
174
     * @test
175
     */
176
    public function can_access_envelope_collection_items_via_first(): void
177
    {
178
        self::bootKernel();
179
180
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
181
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
182
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA(true));
183
184
        $this->messenger()->queue()->assertCount(3);
185
186
        $this->assertSame($m1, $this->messenger()->queue()->first()->getMessage());
0 ignored issues
show
Bug introduced by
The method getMessage() does not exist on Zenstruck\Messenger\Test\TestEnvelope. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

186
        $this->assertSame($m1, $this->messenger()->queue()->first()->/** @scrutinizer ignore-call */ getMessage());
Loading history...
187
        $this->assertSame($m2, $this->messenger()->queue()->first(MessageB::class)->getMessage());
188
        $this->assertSame($m3, $this->messenger()->queue()->first(fn(Envelope $e) => $e->getMessage()->fail)->getMessage());
189
        $this->assertSame($m3, $this->messenger()->queue()->first(fn($e) => $e->getMessage()->fail)->getMessage());
190
        $this->assertSame($m3, $this->messenger()->queue()->first(fn(MessageA $m) => $m->fail)->getMessage());
191
    }
192
193
    /**
194
     * @test
195
     */
196
    public function envelope_collection_first_throws_exception_if_no_match(): void
197
    {
198
        self::bootKernel();
199
200
        $this->expectException(\RuntimeException::class);
201
202
        $this->messenger()->queue()->first();
203
    }
204
205
    /**
206
     * @test
207
     */
208
    public function can_make_stamp_assertions_on_test_envelope(): void
209
    {
210
        self::bootKernel();
211
212
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(), [new DelayStamp(1000)]);
213
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
214
215
        $this->messenger()->queue()->first()->assertHasStamp(DelayStamp::class);
216
        $this->messenger()->queue()->first(MessageB::class)->assertNotHasStamp(DelayStamp::class);
217
218
        Assert::that(fn() => $this->messenger()->queue()->first()->assertHasStamp(SerializerStamp::class))
219
            ->throws(AssertionFailedError::class, \sprintf('Expected to find stamp "%s" but did not.', SerializerStamp::class))
220
        ;
221
    }
222
223
    /**
224
     * @test
225
     */
226
    public function cannot_access_queue_if_none_registered(): void
227
    {
228
        self::bootKernel(['environment' => 'test']);
229
230
        $this->expectException(\LogicException::class);
231
        $this->expectExceptionMessage('No transports registered');
232
233
        $this->messenger();
234
    }
235
236
    /**
237
     * @test
238
     */
239
    public function accessing_transport_boots_kernel_if_not_yet_booted(): void
240
    {
241
        $this->messenger()->queue()->assertEmpty();
242
    }
243
244
    /**
245
     * @test
246
     */
247
    public function can_interact_with_multiple_queues(): void
248
    {
249
        self::bootKernel(['environment' => 'multi_transport']);
250
251
        $this->messenger('async1')->queue()->assertEmpty();
252
        $this->messenger('async2')->queue()->assertEmpty();
253
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
254
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
255
256
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
257
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
258
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
259
260
        $this->messenger('async1')->queue()->assertCount(2);
261
        $this->messenger('async2')->queue()->assertEmpty();
262
        $this->messenger('async1')->queue()->assertContains(MessageA::class);
263
        $this->messenger('async1')->queue()->assertContains(MessageA::class, 2);
264
        $this->messenger('async2')->queue()->assertNotContains(MessageB::class);
265
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
266
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
267
268
        $this->messenger('async1')->process(1);
269
270
        $this->messenger('async1')->queue()->assertCount(1);
271
        $this->messenger('async1')->queue()->assertContains(MessageA::class, 1);
272
        $this->messenger('async2')->queue()->assertNotContains(MessageB::class);
273
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
274
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
275
276
        $this->messenger('async1')->process();
277
278
        $this->messenger('async1')->queue()->assertEmpty();
279
        $this->messenger('async2')->queue()->assertEmpty();
280
        $this->messenger('async2')->acknowledged()->assertCount(1);
281
        $this->messenger('async2')->acknowledged()->assertContains(MessageB::class, 1);
282
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
283
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
284
    }
285
286
    /**
287
     * @test
288
     */
289
    public function can_enable_intercept(): void
290
    {
291
        self::bootKernel(['environment' => 'multi_transport']);
292
293
        $this->messenger('async2')->intercept();
294
295
        $this->messenger('async2')->queue()->assertEmpty();
296
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
297
298
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
299
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
300
301
        $this->messenger('async2')->queue()->assertCount(2);
302
    }
303
304
    /**
305
     * @test
306
     */
307
    public function cannot_access_queue_that_does_not_exist(): void
308
    {
309
        self::bootKernel(['environment' => 'multi_transport']);
310
311
        $this->expectException(\InvalidArgumentException::class);
312
        $this->expectExceptionMessage('"invalid" not registered');
313
314
        $this->messenger('invalid');
315
    }
316
317
    /**
318
     * @test
319
     */
320
    public function cannot_access_queue_that_is_not_test_transport(): void
321
    {
322
        self::bootKernel(['environment' => 'multi_transport']);
323
324
        $this->expectException(\LogicException::class);
325
        $this->expectExceptionMessage('Transport "async3" needs to be set to "test://" in your test config to use this feature.');
326
327
        $this->messenger('async3');
328
    }
329
330
    /**
331
     * @test
332
     */
333
    public function queue_name_is_required_if_using_multiple_transports(): void
334
    {
335
        self::bootKernel(['environment' => 'multi_transport']);
336
337
        $this->expectException(\LogicException::class);
338
        $this->expectExceptionMessage('Multiple transports are registered (async1, async2, async3), you must specify a name.');
339
340
        $this->messenger();
341
    }
342
343
    /**
344
     * @test
345
     */
346
    public function can_access_message_objects_on_queue(): void
347
    {
348
        self::bootKernel();
349
350
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
351
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
352
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA());
353
354
        $this->assertSame([$m1, $m2, $m3], $this->messenger()->queue()->messages());
355
        $this->assertSame([$m1, $m3], $this->messenger()->queue()->messages(MessageA::class));
356
        $this->assertSame([$m2], $this->messenger()->queue()->messages(MessageB::class));
357
    }
358
359
    /**
360
     * @test
361
     */
362
    public function can_access_envelopes_on_envelope_collection(): void
363
    {
364
        self::bootKernel();
365
366
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
367
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
368
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA());
369
370
        $messages = \array_map(fn(TestEnvelope $envelope) => $envelope->getMessage(), $this->messenger()->queue()->all());
371
        $messagesFromIterator = \array_map(fn(TestEnvelope $envelope) => $envelope->getMessage(), \iterator_to_array($this->messenger()->queue()));
372
373
        $this->assertSame([$m1, $m2, $m3], $messages);
374
        $this->assertSame([$m1, $m2, $m3], $messagesFromIterator);
375
    }
376
377
    /**
378
     * @test
379
     */
380
    public function can_access_sent_acknowledged_and_rejected(): void
381
    {
382
        self::bootKernel();
383
384
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA(true));
385
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
386
387
        $this->assertCount(2, $this->messenger()->queue());
388
        $this->assertCount(2, $this->messenger()->dispatched());
389
        $this->assertCount(0, $this->messenger()->acknowledged());
390
        $this->assertCount(0, $this->messenger()->rejected());
391
392
        $this->messenger()->process();
393
394
        $this->assertCount(0, $this->messenger()->queue());
395
        $this->assertCount(2, $this->messenger()->dispatched());
396
        $this->assertCount(1, $this->messenger()->acknowledged());
397
        $this->assertCount(1, $this->messenger()->rejected());
398
    }
399
400
    /**
401
     * @test
402
     */
403
    public function cannot_access_queue_if_bundle_not_enabled(): void
404
    {
405
        self::$class = NoBundleKernel::class;
406
        self::bootKernel(['environment' => 'no_bundle']);
407
        self::$class = null;
408
409
        $this->expectException(\LogicException::class);
410
        $this->expectExceptionMessage('Cannot access transport - is ZenstruckMessengerTestBundle enabled in your test environment?');
411
412
        $this->messenger();
413
    }
414
415
    /**
416
     * @test
417
     */
418
    public function can_configure_throwing_exceptions(): void
419
    {
420
        self::bootKernel();
421
422
        $this->messenger()->throwExceptions();
423
424
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
425
426
        $this->expectException(\RuntimeException::class);
427
        $this->expectExceptionMessage('handling failed...');
428
429
        $this->messenger()->process();
430
    }
431
432
    /**
433
     * @test
434
     */
435
    public function can_configure_throwing_exceptions_with_intercept_disabled(): void
436
    {
437
        self::bootKernel();
438
439
        $this->messenger()->throwExceptions()->unblock();
440
441
        $this->expectException(\RuntimeException::class);
442
        $this->expectExceptionMessage('handling failed...');
443
444
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
445
    }
446
447
    /**
448
     * @test
449
     */
450
    public function can_disable_exception_catching_in_transport_config(): void
451
    {
452
        self::bootKernel(['environment' => 'multi_transport']);
453
454
        $this->expectException(\RuntimeException::class);
455
        $this->expectExceptionMessage('handling failed...');
456
457
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB(true));
458
    }
459
460
    /**
461
     * @test
462
     */
463
    public function can_re_enable_exception_catching_if_disabled_in_transport_config(): void
464
    {
465
        self::bootKernel(['environment' => 'multi_transport']);
466
467
        $this->messenger('async2')->catchExceptions();
468
469
        $this->messenger('async2')->rejected()->assertEmpty();
470
471
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB(true));
472
473
        $this->messenger('async2')->rejected()->assertCount(1);
474
    }
475
476
    /**
477
     * @test
478
     */
479
    public function transport_data_is_persisted_between_requests_and_kernel_shutdown(): void
480
    {
481
        self::bootKernel();
482
483
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
484
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
485
486
        $this->messenger()->queue()->assertCount(2);
487
488
        self::ensureKernelShutdown();
489
        self::bootKernel();
490
491
        $this->messenger()->queue()->assertCount(2);
492
493
        $this->messenger()->process();
494
495
        self::ensureKernelShutdown();
496
        self::bootKernel();
497
498
        $this->messenger()->queue()->assertEmpty();
499
        $this->messenger()->dispatched()->assertCount(2);
500
        $this->messenger()->acknowledged()->assertCount(1);
501
        $this->messenger()->rejected()->assertCount(1);
502
503
        self::ensureKernelShutdown();
504
505
        $client = self::createClient();
506
507
        $client->request('GET', '/dispatch');
508
509
        $this->messenger()->queue()->assertCount(1);
510
        $this->messenger()->dispatched()->assertCount(3);
511
        $this->messenger()->acknowledged()->assertCount(1);
512
        $this->messenger()->rejected()->assertCount(1);
513
514
        $client->request('GET', '/dispatch');
515
516
        $this->messenger()->queue()->assertCount(2);
517
        $this->messenger()->dispatched()->assertCount(4);
518
        $this->messenger()->acknowledged()->assertCount(1);
519
        $this->messenger()->rejected()->assertCount(1);
520
    }
521
522
    /**
523
     * @test
524
     */
525
    public function can_reset_transport_data(): void
526
    {
527
        self::bootKernel();
528
529
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
530
531
        $this->messenger()->queue()->assertNotEmpty();
532
533
        $this->messenger()->reset();
534
535
        $this->messenger()->queue()->assertEmpty();
536
    }
537
538
    /**
539
     * @test
540
     */
541
    public function disabling_intercept_is_remembered_between_kernel_reboots(): void
542
    {
543
        self::bootKernel();
544
545
        $this->messenger()->unblock();
546
547
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
548
549
        $this->messenger()->queue()->assertEmpty();
550
        $this->messenger()->dispatched()->assertCount(1);
551
552
        self::ensureKernelShutdown();
553
        self::bootKernel();
554
555
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
556
557
        $this->messenger()->queue()->assertEmpty();
558
        $this->messenger()->dispatched()->assertCount(2);
559
    }
560
561
    /**
562
     * @test
563
     */
564
    public function throwing_exceptions_is_remembered_between_kernel_reboots(): void
565
    {
566
        self::bootKernel();
567
568
        $this->messenger()->throwExceptions();
569
570
        self::ensureKernelShutdown();
571
        self::bootKernel();
572
573
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
574
575
        $this->expectException(\RuntimeException::class);
576
        $this->expectErrorMessage('handling failed...');
577
578
        $this->messenger()->process();
579
    }
580
581
    /**
582
     * @test
583
     */
584
    public function can_manually_send_message_to_transport_and_process(): void
585
    {
586
        self::bootKernel();
587
588
        $this->messenger()->queue()->assertEmpty();
589
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
590
591
        $this->messenger()->send(Envelope::wrap(new MessageA()));
592
593
        $this->messenger()->queue()->assertCount(1);
594
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
595
596
        $this->messenger()->process();
597
598
        $this->messenger()->queue()->assertEmpty();
599
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
600
    }
601
602
    /**
603
     * @test
604
     */
605
    public function process_all_is_recursive(): void
606
    {
607
        self::bootKernel();
608
609
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
610
611
        $this->messenger()->queue()->assertCount(1);
612
        $this->messenger()->queue()->assertContains(MessageD::class, 1);
613
614
        $this->messenger()->process();
615
616
        $this->messenger()->queue()->assertEmpty();
617
        $this->messenger()->dispatched()->assertCount(3);
618
        $this->messenger()->acknowledged()->assertCount(3);
619
        $this->messenger()->acknowledged()->assertContains(MessageD::class, 1);
620
        $this->messenger()->acknowledged()->assertContains(MessageE::class, 1);
621
        $this->messenger()->acknowledged()->assertContains(MessageF::class, 1);
622
    }
623
624
    /**
625
     * @test
626
     */
627
    public function process_x_messages_is_recursive(): void
628
    {
629
        self::bootKernel();
630
631
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
632
633
        $this->messenger()->queue()->assertCount(1);
634
        $this->messenger()->queue()->assertContains(MessageD::class, 1);
635
636
        $this->messenger()->process(1);
637
638
        $this->messenger()->queue()->assertCount(1);
639
        $this->messenger()->queue()->assertContains(MessageE::class, 1);
640
        $this->messenger()->acknowledged()->assertCount(1);
641
        $this->messenger()->acknowledged()->assertContains(MessageD::class, 1);
642
643
        $this->messenger()->process(2);
644
645
        $this->messenger()->queue()->assertEmpty();
646
        $this->messenger()->acknowledged()->assertCount(3);
647
        $this->messenger()->acknowledged()->assertContains(MessageE::class, 1);
648
        $this->messenger()->acknowledged()->assertContains(MessageF::class, 1);
649
    }
650
651
    /**
652
     * @test
653
     */
654
    public function process_x_recursive_when_intercept_disabled(): void
655
    {
656
        self::bootKernel();
657
658
        $this->messenger()->unblock();
659
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
660
661
        $this->messenger()->acknowledged()->assertCount(3);
662
        $this->messenger()->acknowledged()->assertContains(MessageD::class, 1);
663
        $this->messenger()->acknowledged()->assertContains(MessageE::class, 1);
664
        $this->messenger()->acknowledged()->assertContains(MessageF::class, 1);
665
    }
666
667
    /**
668
     * @test
669
     */
670
    public function fails_if_trying_to_process_more_messages_than_can_be_processed(): void
671
    {
672
        self::bootKernel();
673
674
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
675
676
        Assert::that(fn() => $this->messenger()->process(2))->throws(function(AssertionFailedError $e) {
677
            $this->assertStringContainsString('Expected to process 2 messages but only processed 1.', $e->getMessage());
678
            $this->messenger()->queue()->assertEmpty();
679
            $this->messenger()->acknowledged()->assertContains(MessageA::class, 1);
680
        });
681
    }
682
683
    /**
684
     * @test
685
     */
686
    public function process_or_fail_processes_messages(): void
687
    {
688
        self::bootKernel();
689
690
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
691
692
        $this->messenger()->queue()->assertCount(1);
693
        $this->messenger()->queue()->assertContains(MessageA::class, 1);
694
695
        $this->messenger()->processOrFail();
696
697
        $this->messenger()->queue()->assertEmpty();
698
        $this->messenger()->acknowledged()->assertCount(1);
699
        $this->messenger()->acknowledged()->assertContains(MessageA::class, 1);
700
    }
701
702
    /**
703
     * @test
704
     */
705
    public function process_or_fail_fails_if_no_messages_on_queue(): void
706
    {
707
        self::bootKernel();
708
709
        Assert::that(fn() => $this->messenger()->processOrFail())
710
            ->throws(AssertionFailedError::class, 'No messages to process.')
711
        ;
712
    }
713
714
    /**
715
     * @test
716
     */
717
    public function envelope_collection_assertions(): void
718
    {
719
        self::bootKernel();
720
721
        Assert::that(fn() => $this->messenger()->dispatched()->assertCount(2))
722
            ->throws(AssertionFailedError::class, 'Expected 2 messages but 0 messages found.')
723
        ;
724
        Assert::that(fn() => $this->messenger()->dispatched()->assertContains(MessageA::class))
725
            ->throws(AssertionFailedError::class, \sprintf('Message "%s" not found.', MessageA::class))
726
        ;
727
        Assert::that(fn() => $this->messenger()->dispatched()->assertNotEmpty())
728
            ->throws(AssertionFailedError::class, 'Expected some messages but found none.')
729
        ;
730
731
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
732
733
        Assert::that(fn() => $this->messenger()->dispatched()->assertEmpty())
734
            ->throws(AssertionFailedError::class, 'Expected 0 messages but 1 messages found.')
735
        ;
736
        Assert::that(fn() => $this->messenger()->dispatched()->assertContains(MessageA::class, 2))
737
            ->throws(AssertionFailedError::class, \sprintf('Expected to find "%s" 2 times but found 1 times.', MessageA::class))
738
        ;
739
        Assert::that(fn() => $this->messenger()->dispatched()->assertNotContains(MessageA::class))
740
            ->throws(AssertionFailedError::class, \sprintf('Found message "%s" but should not.', MessageA::class))
741
        ;
742
    }
743
744
    /**
745
     * @test
746
     */
747
    public function messenger_worker_events_are_dispatched_when_processing(): void
748
    {
749
        $messages = [];
750
751
        self::bootKernel();
752
753
        self::getContainer()->get('event_dispatcher')->addListener(
754
            WorkerMessageHandledEvent::class,
755
            static function(WorkerMessageHandledEvent $event) use (&$messages) {
756
                $messages[] = $event->getEnvelope()->getMessage();
757
            }
758
        );
759
760
        self::getContainer()->get(MessageBusInterface::class)->dispatch($message = new MessageA());
761
762
        $this->messenger()->process();
763
764
        $this->assertCount(1, $messages);
765
        $this->assertSame($message, $messages[0]);
766
    }
767
768
    protected static function bootKernel(array $options = []): KernelInterface
769
    {
770
        return parent::bootKernel(\array_merge(['environment' => 'single_transport'], $options));
771
    }
772
773
    protected static function getContainer(): ContainerInterface
774
    {
775
        if (\method_exists(parent::class, 'getContainer')) {
776
            return parent::getContainer();
777
        }
778
779
        return self::$container;
780
    }
781
}
782