TestTransport::reset()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
rs 10
c 1
b 0
f 0
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Transport;
4
5
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
6
use Symfony\Component\Messenger\Envelope;
7
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
8
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
9
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
10
use Symfony\Component\Messenger\MessageBusInterface;
11
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
12
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
13
use Symfony\Component\Messenger\Transport\TransportInterface;
14
use Symfony\Component\Messenger\Worker;
15
use Zenstruck\Assert;
16
use Zenstruck\Messenger\Test\EnvelopeCollection;
17
18
/**
19
 * @author Kevin Bond <[email protected]>
20
 */
21
final class TestTransport implements TransportInterface
22
{
23
    private const DEFAULT_OPTIONS = [
24
        'intercept' => true,
25
        'catch_exceptions' => true,
26
        'test_serialization' => true,
27
        'disable_retries' => true,
28
    ];
29
30
    private string $name;
31
    private EventDispatcherInterface $dispatcher;
32
    private MessageBusInterface $bus;
33
    private SerializerInterface $serializer;
34
35
    /** @var array<string, bool> */
36
    private static array $intercept = [];
37
38
    /** @var array<string, bool> */
39
    private static array $catchExceptions = [];
40
41
    /** @var array<string, bool> */
42
    private static array $testSerialization = [];
43
44
    /** @var array<string, bool> */
45
    private static array $disableRetries = [];
46
47
    /** @var array<string, Envelope[]> */
48
    private static array $dispatched = [];
49
50
    /** @var array<string, Envelope[]> */
51
    private static array $acknowledged = [];
52
53
    /** @var array<string, Envelope[]> */
54
    private static array $rejected = [];
55
56
    /** @var array<string, Envelope[]> */
57
    private static array $queue = [];
58
59
    /**
60
     * @internal
61
     *
62
     * @param array<string,bool> $options
63
     */
64
    public function __construct(string $name, MessageBusInterface $bus, EventDispatcherInterface $dispatcher, SerializerInterface $serializer, array $options = [])
65
    {
66
        $options = \array_merge(self::DEFAULT_OPTIONS, $options);
67
68
        $this->name = $name;
69
        $this->dispatcher = $dispatcher;
70
        $this->bus = $bus;
71
        $this->serializer = $serializer;
72
73
        self::$intercept[$name] ??= $options['intercept'];
74
        self::$catchExceptions[$name] ??= $options['catch_exceptions'];
75
        self::$testSerialization[$name] ??= $options['test_serialization'];
76
        self::$disableRetries[$name] ??= $options['disable_retries'];
77
    }
78
79
    /**
80
     * Processes any messages on the queue and processes future messages
81
     * immediately.
82
     */
83
    public function unblock(): self
84
    {
85
        if ($this->hasMessagesToProcess()) {
86
            // process any messages currently on queue
87
            $this->process();
88
        }
89
90
        self::$intercept[$this->name] = false;
91
92
        return $this;
93
    }
94
95
    /**
96
     * Intercepts any future messages sent to queue.
97
     */
98
    public function intercept(): self
99
    {
100
        self::$intercept[$this->name] = true;
101
102
        return $this;
103
    }
104
105
    public function catchExceptions(): self
106
    {
107
        self::$catchExceptions[$this->name] = true;
108
109
        return $this;
110
    }
111
112
    public function throwExceptions(): self
113
    {
114
        self::$catchExceptions[$this->name] = false;
115
116
        return $this;
117
    }
118
119
    /**
120
     * Processes messages on the queue. This is done recursively so if handling
121
     * a message dispatches more messages, these will be processed as well (up
122
     * to $number).
123
     *
124
     * @param int $number the number of messages to process (-1 for all)
125
     */
126
    public function process(int $number = -1): self
127
    {
128
        $processCount = 0;
129
130
        // keep track of added listeners/subscribers so we can remove after
131
        $listeners = [];
132
        $subscribers = [];
133
134
        $this->dispatcher->addListener(
135
            WorkerRunningEvent::class,
136
            $listeners[WorkerRunningEvent::class] = static function(WorkerRunningEvent $event) use (&$processCount) {
137
                if ($event->isWorkerIdle()) {
138
                    // stop worker if no messages to process
139
                    $event->getWorker()->stop();
140
141
                    return;
142
                }
143
144
                ++$processCount;
145
            }
146
        );
147
148
        if ($number > 0) {
149
            // stop if limit was placed on number to process
150
            $this->dispatcher->addSubscriber($subscribers[] = new StopWorkerOnMessageLimitListener($number));
151
        }
152
153
        if (!$this->isCatchingExceptions()) {
154
            $this->dispatcher->addListener(
155
                WorkerMessageFailedEvent::class,
156
                $listeners[WorkerMessageFailedEvent::class] = static function(WorkerMessageFailedEvent $event) {
157
                    throw $event->getThrowable();
158
                }
159
            );
160
        }
161
162
        $worker = new Worker([$this->name => $this], $this->bus, $this->dispatcher);
163
        $worker->run(['sleep' => 0]);
164
165
        // remove added listeners/subscribers
166
        foreach ($listeners as $event => $listener) {
167
            $this->dispatcher->removeListener($event, $listener);
168
        }
169
170
        foreach ($subscribers as $subscriber) {
171
            $this->dispatcher->removeSubscriber($subscriber);
172
        }
173
174
        if ($number > 0) {
175
            Assert::that($processCount)->is($number, 'Expected to process {expected} messages but only processed {actual}.');
176
        }
177
178
        return $this;
179
    }
180
181
    /**
182
     * Works the same as {@see process()} but fails if no messages on queue.
183
     */
184
    public function processOrFail(int $number = -1): self
185
    {
186
        Assert::true($this->hasMessagesToProcess(), 'No messages to process.');
187
188
        return $this->process($number);
189
    }
190
191
    public function queue(): EnvelopeCollection
192
    {
193
        return new EnvelopeCollection($this, ...self::$queue[$this->name] ?? []);
194
    }
195
196
    public function dispatched(): EnvelopeCollection
197
    {
198
        return new EnvelopeCollection($this, ...self::$dispatched[$this->name] ?? []);
199
    }
200
201
    public function acknowledged(): EnvelopeCollection
202
    {
203
        return new EnvelopeCollection($this, ...self::$acknowledged[$this->name] ?? []);
204
    }
205
206
    public function rejected(): EnvelopeCollection
207
    {
208
        return new EnvelopeCollection($this, ...self::$rejected[$this->name] ?? []);
209
    }
210
211
    /**
212
     * @internal
213
     */
214
    public function get(): iterable
215
    {
216
        return self::$queue[$this->name] ? [\array_shift(self::$queue[$this->name])] : [];
217
    }
218
219
    /**
220
     * @internal
221
     */
222
    public function ack(Envelope $envelope): void
223
    {
224
        self::$acknowledged[$this->name][] = $envelope;
225
    }
226
227
    /**
228
     * @internal
229
     */
230
    public function reject(Envelope $envelope): void
231
    {
232
        self::$rejected[$this->name][] = $envelope;
233
    }
234
235
    public function send(Envelope $envelope): Envelope
236
    {
237
        if ($this->isRetriesDisabled() && $envelope->last(RedeliveryStamp::class)) {
238
            // message is being retried, don't process
239
            return $envelope;
240
        }
241
242
        if ($this->shouldTestSerialization()) {
243
            Assert::try(
244
                fn() => $this->serializer->decode($this->serializer->encode($envelope)),
245
                'A problem occurred in the serialization process.'
246
            );
247
        }
248
249
        self::$dispatched[$this->name][] = $envelope;
250
        self::$queue[$this->name][] = $envelope;
251
252
        if (!$this->isIntercepting()) {
253
            $this->process();
254
        }
255
256
        return $envelope;
257
    }
258
259
    /**
260
     * Resets all the data for this transport.
261
     */
262
    public function reset(): void
263
    {
264
        self::$queue[$this->name] = self::$dispatched[$this->name] = self::$acknowledged[$this->name] = self::$rejected[$this->name] = [];
265
    }
266
267
    /**
268
     * Resets data and options for all transports.
269
     */
270
    public static function resetAll(): void
271
    {
272
        self::$queue = self::$dispatched = self::$acknowledged = self::$rejected = self::$intercept = self::$catchExceptions = [];
273
    }
274
275
    private function isIntercepting(): bool
276
    {
277
        return self::$intercept[$this->name];
278
    }
279
280
    private function isCatchingExceptions(): bool
281
    {
282
        return self::$catchExceptions[$this->name];
283
    }
284
285
    private function shouldTestSerialization(): bool
286
    {
287
        return self::$testSerialization[$this->name];
288
    }
289
290
    private function isRetriesDisabled(): bool
291
    {
292
        return self::$disableRetries[$this->name];
293
    }
294
295
    private function hasMessagesToProcess(): bool
296
    {
297
        return !empty(self::$queue[$this->name] ?? []);
298
    }
299
}
300