Passed
Push — 1.x ( d64946...b02e86 )
by Kevin
01:42
created

can_use_envelope_collection_back()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 23
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 17
c 1
b 0
f 1
nc 1
nop 0
dl 0
loc 23
rs 9.7
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Tests;
4
5
use PHPUnit\Framework\AssertionFailedError;
6
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\HttpKernel\KernelInterface;
9
use Symfony\Component\Messenger\Envelope;
10
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
11
use Symfony\Component\Messenger\MessageBusInterface;
12
use Symfony\Component\Messenger\Stamp\DelayStamp;
13
use Symfony\Component\Messenger\Stamp\SerializerStamp;
14
use Zenstruck\Assert;
15
use Zenstruck\Messenger\Test\InteractsWithMessenger;
16
use Zenstruck\Messenger\Test\TestEnvelope;
17
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA;
18
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageAHandler;
19
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageB;
20
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageBHandler;
21
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageC;
22
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageD;
23
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageE;
24
use Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageF;
25
use Zenstruck\Messenger\Test\Tests\Fixture\NoBundleKernel;
26
27
/**
28
 * @author Kevin Bond <[email protected]>
29
 */
30
final class InteractsWithMessengerTest extends WebTestCase
31
{
32
    use InteractsWithMessenger;
33
34
    /**
35
     * @test
36
     */
37
    public function can_interact_with_queue(): void
38
    {
39
        self::bootKernel();
40
41
        $this->messenger()->queue()->assertEmpty();
42
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
43
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
44
45
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
46
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
47
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
48
49
        $this->messenger()->queue()->assertCount(3);
50
        $this->messenger()->queue()->assertContains(MessageA::class);
51
        $this->messenger()->queue()->assertContains(MessageA::class, 2);
52
        $this->messenger()->queue()->assertContains(MessageB::class);
53
        $this->messenger()->queue()->assertContains(MessageB::class, 1);
54
        $this->messenger()->queue()->assertNotContains(MessageC::class);
55
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
56
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
57
58
        $this->messenger()->process(2);
59
60
        $this->messenger()->queue()->assertCount(1);
61
        $this->messenger()->queue()->assertContains(MessageA::class, 1);
62
        $this->messenger()->queue()->assertNotContains(MessageB::class);
63
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
64
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
65
66
        $this->messenger()->process();
67
68
        $this->messenger()->queue()->assertEmpty();
69
        $this->messenger()->queue()->assertNotContains(MessageA::class);
70
        $this->messenger()->queue()->assertNotContains(MessageB::class);
71
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
72
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
73
    }
74
75
    /**
76
     * @test
77
     */
78
    public function can_use_envelope_collection_back(): void
79
    {
80
        self::bootKernel();
81
82
        $this->messenger()
83
            ->queue()->assertEmpty()->back()
84
            ->dispatched()->assertEmpty()->back()
85
            ->acknowledged()->assertEmpty()->back()
86
            ->rejected()->assertEmpty()
87
        ;
88
89
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
90
91
        $this->messenger()
92
            ->queue()->assertCount(1)->back()
93
            ->dispatched()->assertCount(1)->back()
94
            ->acknowledged()->assertEmpty()->back()
95
            ->rejected()->assertEmpty()->back()
96
            ->process()
97
            ->queue()->assertEmpty()->back()
98
            ->dispatched()->assertCount(1)->back()
99
            ->acknowledged()->assertCount(1)->back()
100
            ->rejected()->assertEmpty()->back()
101
        ;
102
    }
103
104
    /**
105
     * @test
106
     */
107
    public function can_disable_intercept(): void
108
    {
109
        self::bootKernel();
110
111
        $this->messenger()->unblock();
112
113
        $this->messenger()->queue()->assertEmpty();
114
        $this->messenger()->acknowledged()->assertEmpty();
115
        $this->messenger()->dispatched()->assertEmpty();
116
117
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
118
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
119
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
120
121
        $this->messenger()->queue()->assertEmpty();
122
        $this->messenger()->queue()->assertNotContains(MessageA::class);
123
        $this->messenger()->dispatched()->assertCount(3);
124
        $this->messenger()->dispatched()->assertContains(MessageA::class, 2);
125
        $this->messenger()->dispatched()->assertContains(MessageB::class, 1);
126
        $this->messenger()->acknowledged()->assertCount(3);
127
        $this->messenger()->acknowledged()->assertContains(MessageA::class, 2);
128
        $this->messenger()->acknowledged()->assertContains(MessageB::class, 1);
129
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
130
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
131
    }
132
133
    /**
134
     * @test
135
     */
136
    public function disabling_intercept_with_items_on_queue_processes_all(): void
137
    {
138
        self::bootKernel();
139
140
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
141
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
142
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
143
144
        $this->messenger()->queue()->assertCount(3);
145
146
        $this->messenger()->process();
147
148
        $this->messenger()->queue()->assertEmpty();
149
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
150
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
151
    }
152
153
    /**
154
     * @test
155
     */
156
    public function unblocking_processes_existing_messages_on_queue(): void
157
    {
158
        self::bootKernel();
159
160
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
161
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
162
163
        $this->messenger()->queue()->assertCount(2);
164
        $this->messenger()->acknowledged()->assertEmpty();
165
166
        $this->messenger()->unblock();
167
168
        $this->messenger()->queue()->assertEmpty();
169
        $this->messenger()->acknowledged()->assertCount(2);
170
    }
171
172
    /**
173
     * @test
174
     */
175
    public function can_access_envelope_collection_items_via_first(): void
176
    {
177
        self::bootKernel();
178
179
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
180
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
181
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA(true));
182
183
        $this->messenger()->queue()->assertCount(3);
184
185
        $this->assertSame($m1, $this->messenger()->queue()->first()->getMessage());
0 ignored issues
show
Bug introduced by
The method getMessage() does not exist on Zenstruck\Messenger\Test\TestEnvelope. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

185
        $this->assertSame($m1, $this->messenger()->queue()->first()->/** @scrutinizer ignore-call */ getMessage());
Loading history...
186
        $this->assertSame($m2, $this->messenger()->queue()->first(MessageB::class)->getMessage());
187
        $this->assertSame($m3, $this->messenger()->queue()->first(fn(Envelope $e) => $e->getMessage()->fail)->getMessage());
188
        $this->assertSame($m3, $this->messenger()->queue()->first(fn($e) => $e->getMessage()->fail)->getMessage());
189
        $this->assertSame($m3, $this->messenger()->queue()->first(fn(MessageA $m) => $m->fail)->getMessage());
190
    }
191
192
    /**
193
     * @test
194
     */
195
    public function envelope_collection_first_throws_exception_if_no_match(): void
196
    {
197
        self::bootKernel();
198
199
        $this->expectException(\RuntimeException::class);
200
201
        $this->messenger()->queue()->first();
202
    }
203
204
    /**
205
     * @test
206
     */
207
    public function can_make_stamp_assertions_on_test_envelope(): void
208
    {
209
        self::bootKernel();
210
211
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(), [new DelayStamp(1000)]);
212
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
213
214
        $this->messenger()->queue()->first()->assertHasStamp(DelayStamp::class);
215
        $this->messenger()->queue()->first(MessageB::class)->assertNotHasStamp(DelayStamp::class);
216
217
        Assert::that(fn() => $this->messenger()->queue()->first()->assertHasStamp(SerializerStamp::class))
218
            ->throws(AssertionFailedError::class, \sprintf('Expected to find stamp "%s" but did not.', SerializerStamp::class))
219
        ;
220
    }
221
222
    /**
223
     * @test
224
     */
225
    public function cannot_access_queue_if_none_registered(): void
226
    {
227
        self::bootKernel(['environment' => 'test']);
228
229
        $this->expectException(\LogicException::class);
230
        $this->expectExceptionMessage('No transports registered');
231
232
        $this->messenger();
233
    }
234
235
    /**
236
     * @test
237
     */
238
    public function accessing_transport_boots_kernel_if_not_yet_booted(): void
239
    {
240
        $this->messenger()->queue()->assertEmpty();
241
    }
242
243
    /**
244
     * @test
245
     */
246
    public function can_interact_with_multiple_queues(): void
247
    {
248
        self::bootKernel(['environment' => 'multi_transport']);
249
250
        $this->messenger('async1')->queue()->assertEmpty();
251
        $this->messenger('async2')->queue()->assertEmpty();
252
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
253
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
254
255
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
256
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
257
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
258
259
        $this->messenger('async1')->queue()->assertCount(2);
260
        $this->messenger('async2')->queue()->assertEmpty();
261
        $this->messenger('async1')->queue()->assertContains(MessageA::class);
262
        $this->messenger('async1')->queue()->assertContains(MessageA::class, 2);
263
        $this->messenger('async2')->queue()->assertNotContains(MessageB::class);
264
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
265
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
266
267
        $this->messenger('async1')->process(1);
268
269
        $this->messenger('async1')->queue()->assertCount(1);
270
        $this->messenger('async1')->queue()->assertContains(MessageA::class, 1);
271
        $this->messenger('async2')->queue()->assertNotContains(MessageB::class);
272
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
273
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
274
275
        $this->messenger('async1')->process();
276
277
        $this->messenger('async1')->queue()->assertEmpty();
278
        $this->messenger('async2')->queue()->assertEmpty();
279
        $this->messenger('async2')->acknowledged()->assertCount(1);
280
        $this->messenger('async2')->acknowledged()->assertContains(MessageB::class, 1);
281
        $this->assertCount(2, self::getContainer()->get(MessageAHandler::class)->messages);
282
        $this->assertCount(1, self::getContainer()->get(MessageBHandler::class)->messages);
283
    }
284
285
    /**
286
     * @test
287
     */
288
    public function can_enable_intercept(): void
289
    {
290
        self::bootKernel(['environment' => 'multi_transport']);
291
292
        $this->messenger('async2')->intercept();
293
294
        $this->messenger('async2')->queue()->assertEmpty();
295
        $this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);
296
297
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
298
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
299
300
        $this->messenger('async2')->queue()->assertCount(2);
301
    }
302
303
    /**
304
     * @test
305
     */
306
    public function cannot_access_queue_that_does_not_exist(): void
307
    {
308
        self::bootKernel(['environment' => 'multi_transport']);
309
310
        $this->expectException(\InvalidArgumentException::class);
311
        $this->expectExceptionMessage('"invalid" not registered');
312
313
        $this->messenger('invalid');
314
    }
315
316
    /**
317
     * @test
318
     */
319
    public function cannot_access_queue_that_is_not_test_transport(): void
320
    {
321
        self::bootKernel(['environment' => 'multi_transport']);
322
323
        $this->expectException(\LogicException::class);
324
        $this->expectExceptionMessage('Transport "async3" needs to be set to "test://" in your test config to use this feature.');
325
326
        $this->messenger('async3');
327
    }
328
329
    /**
330
     * @test
331
     */
332
    public function queue_name_is_required_if_using_multiple_transports(): void
333
    {
334
        self::bootKernel(['environment' => 'multi_transport']);
335
336
        $this->expectException(\LogicException::class);
337
        $this->expectExceptionMessage('Multiple transports are registered (async1, async2, async3), you must specify a name.');
338
339
        $this->messenger();
340
    }
341
342
    /**
343
     * @test
344
     */
345
    public function can_access_message_objects_on_queue(): void
346
    {
347
        self::bootKernel();
348
349
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
350
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
351
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA());
352
353
        $this->assertSame([$m1, $m2, $m3], $this->messenger()->queue()->messages());
354
        $this->assertSame([$m1, $m3], $this->messenger()->queue()->messages(MessageA::class));
355
        $this->assertSame([$m2], $this->messenger()->queue()->messages(MessageB::class));
356
    }
357
358
    /**
359
     * @test
360
     */
361
    public function can_access_envelopes_on_envelope_collection(): void
362
    {
363
        self::bootKernel();
364
365
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
366
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
367
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA());
368
369
        $messages = \array_map(fn(TestEnvelope $envelope) => $envelope->getMessage(), $this->messenger()->queue()->all());
370
        $messagesFromIterator = \array_map(fn(TestEnvelope $envelope) => $envelope->getMessage(), \iterator_to_array($this->messenger()->queue()));
371
372
        $this->assertSame([$m1, $m2, $m3], $messages);
373
        $this->assertSame([$m1, $m2, $m3], $messagesFromIterator);
374
    }
375
376
    /**
377
     * @test
378
     */
379
    public function can_access_sent_acknowledged_and_rejected(): void
380
    {
381
        self::bootKernel();
382
383
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA(true));
384
        self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
385
386
        $this->assertCount(2, $this->messenger()->queue());
387
        $this->assertCount(2, $this->messenger()->dispatched());
388
        $this->assertCount(0, $this->messenger()->acknowledged());
389
        $this->assertCount(0, $this->messenger()->rejected());
390
391
        $this->messenger()->process();
392
393
        $this->assertCount(0, $this->messenger()->queue());
394
        $this->assertCount(2, $this->messenger()->dispatched());
395
        $this->assertCount(1, $this->messenger()->acknowledged());
396
        $this->assertCount(1, $this->messenger()->rejected());
397
    }
398
399
    /**
400
     * @test
401
     */
402
    public function cannot_access_queue_if_bundle_not_enabled(): void
403
    {
404
        self::$class = NoBundleKernel::class;
405
        self::bootKernel(['environment' => 'no_bundle']);
406
        self::$class = null;
407
408
        $this->expectException(\LogicException::class);
409
        $this->expectExceptionMessage('Cannot access transport - is ZenstruckMessengerTestBundle enabled in your test environment?');
410
411
        $this->messenger();
412
    }
413
414
    /**
415
     * @test
416
     */
417
    public function can_configure_throwing_exceptions(): void
418
    {
419
        self::bootKernel();
420
421
        $this->messenger()->throwExceptions();
422
423
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
424
425
        $this->expectException(\RuntimeException::class);
426
        $this->expectExceptionMessage('handling failed...');
427
428
        $this->messenger()->process();
429
    }
430
431
    /**
432
     * @test
433
     */
434
    public function can_configure_throwing_exceptions_with_intercept_disabled(): void
435
    {
436
        self::bootKernel();
437
438
        $this->messenger()->throwExceptions()->unblock();
439
440
        $this->expectException(\RuntimeException::class);
441
        $this->expectExceptionMessage('handling failed...');
442
443
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
444
    }
445
446
    /**
447
     * @test
448
     */
449
    public function can_disable_exception_catching_in_transport_config(): void
450
    {
451
        self::bootKernel(['environment' => 'multi_transport']);
452
453
        $this->expectException(\RuntimeException::class);
454
        $this->expectExceptionMessage('handling failed...');
455
456
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB(true));
457
    }
458
459
    /**
460
     * @test
461
     */
462
    public function can_re_enable_exception_catching_if_disabled_in_transport_config(): void
463
    {
464
        self::bootKernel(['environment' => 'multi_transport']);
465
466
        $this->messenger('async2')->catchExceptions();
467
468
        $this->messenger('async2')->rejected()->assertEmpty();
469
470
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB(true));
471
472
        $this->messenger('async2')->rejected()->assertCount(1);
473
    }
474
475
    /**
476
     * @test
477
     */
478
    public function transport_data_is_persisted_between_requests_and_kernel_shutdown(): void
479
    {
480
        self::bootKernel();
481
482
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
483
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
484
485
        $this->messenger()->queue()->assertCount(2);
486
487
        self::ensureKernelShutdown();
488
        self::bootKernel();
489
490
        $this->messenger()->queue()->assertCount(2);
491
492
        $this->messenger()->process();
493
494
        self::ensureKernelShutdown();
495
        self::bootKernel();
496
497
        $this->messenger()->queue()->assertEmpty();
498
        $this->messenger()->dispatched()->assertCount(2);
499
        $this->messenger()->acknowledged()->assertCount(1);
500
        $this->messenger()->rejected()->assertCount(1);
501
502
        self::ensureKernelShutdown();
503
504
        $client = self::createClient();
505
506
        $client->request('GET', '/dispatch');
507
508
        $this->messenger()->queue()->assertCount(1);
509
        $this->messenger()->dispatched()->assertCount(3);
510
        $this->messenger()->acknowledged()->assertCount(1);
511
        $this->messenger()->rejected()->assertCount(1);
512
513
        $client->request('GET', '/dispatch');
514
515
        $this->messenger()->queue()->assertCount(2);
516
        $this->messenger()->dispatched()->assertCount(4);
517
        $this->messenger()->acknowledged()->assertCount(1);
518
        $this->messenger()->rejected()->assertCount(1);
519
    }
520
521
    /**
522
     * @test
523
     */
524
    public function can_reset_transport_data(): void
525
    {
526
        self::bootKernel();
527
528
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
529
530
        $this->messenger()->queue()->assertNotEmpty();
531
532
        $this->messenger()->reset();
533
534
        $this->messenger()->queue()->assertEmpty();
535
    }
536
537
    /**
538
     * @test
539
     */
540
    public function disabling_intercept_is_remembered_between_kernel_reboots(): void
541
    {
542
        self::bootKernel();
543
544
        $this->messenger()->unblock();
545
546
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
547
548
        $this->messenger()->queue()->assertEmpty();
549
        $this->messenger()->dispatched()->assertCount(1);
550
551
        self::ensureKernelShutdown();
552
        self::bootKernel();
553
554
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
555
556
        $this->messenger()->queue()->assertEmpty();
557
        $this->messenger()->dispatched()->assertCount(2);
558
    }
559
560
    /**
561
     * @test
562
     */
563
    public function throwing_exceptions_is_remembered_between_kernel_reboots(): void
564
    {
565
        self::bootKernel();
566
567
        $this->messenger()->throwExceptions();
568
569
        self::ensureKernelShutdown();
570
        self::bootKernel();
571
572
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
573
574
        $this->expectException(\RuntimeException::class);
575
        $this->expectErrorMessage('handling failed...');
576
577
        $this->messenger()->process();
578
    }
579
580
    /**
581
     * @test
582
     */
583
    public function can_manually_send_message_to_transport_and_process(): void
584
    {
585
        self::bootKernel();
586
587
        $this->messenger()->queue()->assertEmpty();
588
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
589
590
        $this->messenger()->send(Envelope::wrap(new MessageA()));
591
592
        $this->messenger()->queue()->assertCount(1);
593
        $this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
594
595
        $this->messenger()->process();
596
597
        $this->messenger()->queue()->assertEmpty();
598
        $this->assertCount(1, self::getContainer()->get(MessageAHandler::class)->messages);
599
    }
600
601
    /**
602
     * @test
603
     */
604
    public function process_all_is_recursive(): void
605
    {
606
        self::bootKernel();
607
608
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
609
610
        $this->messenger()->queue()->assertCount(1);
611
        $this->messenger()->queue()->assertContains(MessageD::class, 1);
612
613
        $this->messenger()->process();
614
615
        $this->messenger()->queue()->assertEmpty();
616
        $this->messenger()->dispatched()->assertCount(3);
617
        $this->messenger()->acknowledged()->assertCount(3);
618
        $this->messenger()->acknowledged()->assertContains(MessageD::class, 1);
619
        $this->messenger()->acknowledged()->assertContains(MessageE::class, 1);
620
        $this->messenger()->acknowledged()->assertContains(MessageF::class, 1);
621
    }
622
623
    /**
624
     * @test
625
     */
626
    public function process_x_messages_is_recursive(): void
627
    {
628
        self::bootKernel();
629
630
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
631
632
        $this->messenger()->queue()->assertCount(1);
633
        $this->messenger()->queue()->assertContains(MessageD::class, 1);
634
635
        $this->messenger()->process(1);
636
637
        $this->messenger()->queue()->assertCount(1);
638
        $this->messenger()->queue()->assertContains(MessageE::class, 1);
639
        $this->messenger()->acknowledged()->assertCount(1);
640
        $this->messenger()->acknowledged()->assertContains(MessageD::class, 1);
641
642
        $this->messenger()->process(2);
643
644
        $this->messenger()->queue()->assertEmpty();
645
        $this->messenger()->acknowledged()->assertCount(3);
646
        $this->messenger()->acknowledged()->assertContains(MessageE::class, 1);
647
        $this->messenger()->acknowledged()->assertContains(MessageF::class, 1);
648
    }
649
650
    /**
651
     * @test
652
     */
653
    public function fails_if_trying_to_process_more_messages_than_can_be_processed(): void
654
    {
655
        self::bootKernel();
656
657
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
658
659
        Assert::that(fn() => $this->messenger()->process(2))->throws(function(AssertionFailedError $e) {
660
            $this->assertStringContainsString('Expected to process 2 messages but only processed 1.', $e->getMessage());
661
            $this->messenger()->queue()->assertEmpty();
662
            $this->messenger()->acknowledged()->assertContains(MessageA::class, 1);
663
        });
664
    }
665
666
    /**
667
     * @test
668
     */
669
    public function process_or_fail_processes_messages(): void
670
    {
671
        self::bootKernel();
672
673
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
674
675
        $this->messenger()->queue()->assertCount(1);
676
        $this->messenger()->queue()->assertContains(MessageA::class, 1);
677
678
        $this->messenger()->processOrFail();
679
680
        $this->messenger()->queue()->assertEmpty();
681
        $this->messenger()->acknowledged()->assertCount(1);
682
        $this->messenger()->acknowledged()->assertContains(MessageA::class, 1);
683
    }
684
685
    /**
686
     * @test
687
     */
688
    public function process_or_fail_fails_if_no_messages_on_queue(): void
689
    {
690
        self::bootKernel();
691
692
        Assert::that(fn() => $this->messenger()->processOrFail())
693
            ->throws(AssertionFailedError::class, 'No messages to process.')
694
        ;
695
    }
696
697
    /**
698
     * @test
699
     */
700
    public function envelope_collection_assertions(): void
701
    {
702
        self::bootKernel();
703
704
        Assert::that(fn() => $this->messenger()->dispatched()->assertCount(2))
705
            ->throws(AssertionFailedError::class, 'Expected 2 messages but 0 messages found.')
706
        ;
707
        Assert::that(fn() => $this->messenger()->dispatched()->assertContains(MessageA::class))
708
            ->throws(AssertionFailedError::class, \sprintf('Message "%s" not found.', MessageA::class))
709
        ;
710
        Assert::that(fn() => $this->messenger()->dispatched()->assertNotEmpty())
711
            ->throws(AssertionFailedError::class, 'Expected some messages but found none.')
712
        ;
713
714
        self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
715
716
        Assert::that(fn() => $this->messenger()->dispatched()->assertEmpty())
717
            ->throws(AssertionFailedError::class, 'Expected 0 messages but 1 messages found.')
718
        ;
719
        Assert::that(fn() => $this->messenger()->dispatched()->assertContains(MessageA::class, 2))
720
            ->throws(AssertionFailedError::class, \sprintf('Expected to find "%s" 2 times but found 1 times.', MessageA::class))
721
        ;
722
        Assert::that(fn() => $this->messenger()->dispatched()->assertNotContains(MessageA::class))
723
            ->throws(AssertionFailedError::class, \sprintf('Found message "%s" but should not.', MessageA::class))
724
        ;
725
    }
726
727
    /**
728
     * @test
729
     */
730
    public function messenger_worker_events_are_dispatched_when_processing(): void
731
    {
732
        $messages = [];
733
734
        self::bootKernel();
735
736
        self::getContainer()->get('event_dispatcher')->addListener(
737
            WorkerMessageHandledEvent::class,
738
            static function(WorkerMessageHandledEvent $event) use (&$messages) {
739
                $messages[] = $event->getEnvelope()->getMessage();
740
            }
741
        );
742
743
        self::getContainer()->get(MessageBusInterface::class)->dispatch($message = new MessageA());
744
745
        $this->messenger()->process();
746
747
        $this->assertCount(1, $messages);
748
        $this->assertSame($message, $messages[0]);
749
    }
750
751
    protected static function bootKernel(array $options = []): KernelInterface
752
    {
753
        return parent::bootKernel(\array_merge(['environment' => 'single_transport'], $options));
754
    }
755
756
    protected static function getContainer(): ContainerInterface
757
    {
758
        if (\method_exists(parent::class, 'getContainer')) {
759
            return parent::getContainer();
760
        }
761
762
        return self::$container;
763
    }
764
}
765