Completed
Push — 1.x ( 88b846...13d125 )
by Kevin
15s queued 12s
created

TestTransport::processOrFail()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
c 0
b 0
f 0
nc 1
nop 1
dl 0
loc 5
rs 10
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Transport;
4
5
use PHPUnit\Framework\Assert as PHPUnit;
6
use Symfony\Component\EventDispatcher\EventDispatcher;
7
use Symfony\Component\Messenger\Envelope;
8
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
9
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
10
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
11
use Symfony\Component\Messenger\MessageBusInterface;
12
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
13
use Symfony\Component\Messenger\Transport\TransportInterface;
14
use Symfony\Component\Messenger\Worker;
15
use Zenstruck\Messenger\Test\EnvelopeCollection;
16
17
/**
18
 * @author Kevin Bond <[email protected]>
19
 */
20
final class TestTransport implements TransportInterface
21
{
22
    private const DEFAULT_OPTIONS = [
23
        'intercept' => true,
24
        'catch_exceptions' => true,
25
    ];
26
27
    private string $name;
28
    private MessageBusInterface $bus;
29
    private SerializerInterface $serializer;
30
31
    /** @var array<string, bool> */
32
    private static array $intercept = [];
33
34
    /** @var array<string, bool> */
35
    private static array $catchExceptions = [];
36
37
    /** @var array<string, Envelope[]> */
38
    private static array $dispatched = [];
39
40
    /** @var array<string, Envelope[]> */
41
    private static array $acknowledged = [];
42
43
    /** @var array<string, Envelope[]> */
44
    private static array $rejected = [];
45
46
    /** @var array<string, Envelope[]> */
47
    private static array $queue = [];
48
49
    /**
50
     * @internal
51
     */
52
    public function __construct(string $name, MessageBusInterface $bus, SerializerInterface $serializer, array $options = [])
53
    {
54
        $options = \array_merge(self::DEFAULT_OPTIONS, $options);
55
56
        $this->name = $name;
57
        $this->bus = $bus;
58
        $this->serializer = $serializer;
59
60
        self::$intercept[$name] ??= $options['intercept'];
61
        self::$catchExceptions[$name] ??= $options['catch_exceptions'];
62
    }
63
64
    /**
65
     * Processes any messages on the queue and processes future messages
66
     * immediately.
67
     */
68
    public function unblock(): self
69
    {
70
        if ($this->hasMessagesToProcess()) {
71
            // process any messages currently on queue
72
            $this->process();
73
        }
74
75
        self::$intercept[$this->name] = false;
76
77
        return $this;
78
    }
79
80
    /**
81
     * Intercepts any future messages sent to queue.
82
     */
83
    public function intercept(): self
84
    {
85
        self::$intercept[$this->name] = true;
86
87
        return $this;
88
    }
89
90
    public function catchExceptions(): self
91
    {
92
        self::$catchExceptions[$this->name] = true;
93
94
        return $this;
95
    }
96
97
    public function throwExceptions(): self
98
    {
99
        self::$catchExceptions[$this->name] = false;
100
101
        return $this;
102
    }
103
104
    /**
105
     * Processes messages on the queue. This is done recursively so if handling
106
     * a message dispatches more messages, these will be processed as well (up
107
     * to $number).
108
     *
109
     * @param int $number the number of messages to process (-1 for all)
110
     */
111
    public function process(int $number = -1): self
112
    {
113
        $eventDispatcher = new EventDispatcher();
114
        $processCount = 0;
115
116
        $eventDispatcher->addListener(
117
            WorkerRunningEvent::class,
118
            static function(WorkerRunningEvent $event) use (&$processCount) {
119
                if ($event->isWorkerIdle()) {
120
                    // stop worker if no messages to process
121
                    $event->getWorker()->stop();
122
123
                    return;
124
                }
125
126
                ++$processCount;
127
            }
128
        );
129
130
        if ($number > 0) {
131
            // stop if limit was placed on number to process
132
            $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($number));
133
        }
134
135
        if (!$this->isCatchingExceptions()) {
136
            $eventDispatcher->addListener(WorkerMessageFailedEvent::class, function(WorkerMessageFailedEvent $event) {
137
                throw $event->getThrowable();
138
            });
139
        }
140
141
        $worker = new Worker([$this], $this->bus, $eventDispatcher);
142
        $worker->run(['sleep' => 0]);
143
144
        if ($number > 0) {
145
            PHPUnit::assertSame($number, $processCount, "Expected to process {$number} messages but only {$processCount} was processed.");
146
        }
147
148
        return $this;
149
    }
150
151
    /**
152
     * Works the same as {@see process()} but fails if no messages on queue.
153
     */
154
    public function processOrFail(int $number = -1): self
155
    {
156
        PHPUnit::assertTrue($this->hasMessagesToProcess(), 'No messages to process.');
157
158
        return $this->process($number);
159
    }
160
161
    public function queue(): EnvelopeCollection
162
    {
163
        return new EnvelopeCollection(...\array_values(self::$queue[$this->name] ?? []));
164
    }
165
166
    public function dispatched(): EnvelopeCollection
167
    {
168
        return new EnvelopeCollection(...self::$dispatched[$this->name] ?? []);
169
    }
170
171
    public function acknowledged(): EnvelopeCollection
172
    {
173
        return new EnvelopeCollection(...self::$acknowledged[$this->name] ?? []);
174
    }
175
176
    public function rejected(): EnvelopeCollection
177
    {
178
        return new EnvelopeCollection(...self::$rejected[$this->name] ?? []);
179
    }
180
181
    /**
182
     * @internal
183
     */
184
    public function get(): iterable
185
    {
186
        return \array_values(self::$queue[$this->name] ?? []);
187
    }
188
189
    /**
190
     * @internal
191
     */
192
    public function ack(Envelope $envelope): void
193
    {
194
        self::$acknowledged[$this->name][] = $envelope;
195
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
196
    }
197
198
    /**
199
     * @internal
200
     */
201
    public function reject(Envelope $envelope): void
202
    {
203
        self::$rejected[$this->name][] = $envelope;
204
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
205
    }
206
207
    public function send(Envelope $envelope): Envelope
208
    {
209
        // ensure serialization works (todo configurable? better error on failure?)
210
        $this->serializer->decode($this->serializer->encode($envelope));
211
212
        self::$dispatched[$this->name][] = $envelope;
213
        self::$queue[$this->name][\spl_object_hash($envelope->getMessage())] = $envelope;
214
215
        if (!$this->isIntercepting()) {
216
            $this->process();
217
        }
218
219
        return $envelope;
220
    }
221
222
    /**
223
     * Resets all the data for this transport.
224
     */
225
    public function reset(): void
226
    {
227
        self::$queue[$this->name] = self::$dispatched[$this->name] = self::$acknowledged[$this->name] = self::$rejected[$this->name] = [];
228
    }
229
230
    /**
231
     * Resets data and options for all transports.
232
     */
233
    public static function resetAll(): void
234
    {
235
        self::$queue = self::$dispatched = self::$acknowledged = self::$rejected = self::$intercept = self::$catchExceptions = [];
236
    }
237
238
    private function isIntercepting(): bool
239
    {
240
        return self::$intercept[$this->name];
241
    }
242
243
    private function isCatchingExceptions(): bool
244
    {
245
        return self::$catchExceptions[$this->name];
246
    }
247
248
    private function hasMessagesToProcess(): bool
249
    {
250
        return !empty(self::$queue[$this->name] ?? []);
251
    }
252
}
253