Completed
Push — 1.x ( 13376b...53750d )
by Kevin
29s queued 13s
created

TestTransport::isRetriesDisabled()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 3
rs 10
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Transport;
4
5
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
6
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
7
use Symfony\Component\Messenger\Envelope;
8
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
9
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
10
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
11
use Symfony\Component\Messenger\MessageBusInterface;
12
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
13
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
14
use Symfony\Component\Messenger\Transport\TransportInterface;
15
use Symfony\Component\Messenger\Worker;
16
use Zenstruck\Assert;
17
use Zenstruck\Messenger\Test\EnvelopeCollection;
18
19
/**
20
 * @author Kevin Bond <[email protected]>
21
 */
22
final class TestTransport implements TransportInterface
23
{
24
    private const DEFAULT_OPTIONS = [
25
        'intercept' => true,
26
        'catch_exceptions' => true,
27
        'test_serialization' => true,
28
        'disable_retries' => true,
29
    ];
30
31
    private string $name;
32
    private EventDispatcherInterface $dispatcher;
33
    private MessageBusInterface $bus;
34
    private SerializerInterface $serializer;
35
36
    /** @var array<string, bool> */
37
    private static array $intercept = [];
38
39
    /** @var array<string, bool> */
40
    private static array $catchExceptions = [];
41
42
    /** @var array<string, bool> */
43
    private static array $testSerialization = [];
44
45
    /** @var array<string, bool> */
46
    private static array $disableRetries = [];
47
48
    /** @var array<string, Envelope[]> */
49
    private static array $dispatched = [];
50
51
    /** @var array<string, Envelope[]> */
52
    private static array $acknowledged = [];
53
54
    /** @var array<string, Envelope[]> */
55
    private static array $rejected = [];
56
57
    /** @var array<string, Envelope[]> */
58
    private static array $queue = [];
59
60
    /**
61
     * @internal
62
     */
63
    public function __construct(string $name, MessageBusInterface $bus, EventDispatcherInterface $dispatcher, SerializerInterface $serializer, array $options = [])
64
    {
65
        $options = \array_merge(self::DEFAULT_OPTIONS, $options);
66
67
        $this->name = $name;
68
        $this->dispatcher = $dispatcher;
69
        $this->bus = $bus;
70
        $this->serializer = $serializer;
71
72
        self::$intercept[$name] ??= $options['intercept'];
73
        self::$catchExceptions[$name] ??= $options['catch_exceptions'];
74
        self::$testSerialization[$name] ??= $options['test_serialization'];
75
        self::$disableRetries[$name] ??= $options['disable_retries'];
76
    }
77
78
    /**
79
     * Processes any messages on the queue and processes future messages
80
     * immediately.
81
     */
82
    public function unblock(): self
83
    {
84
        if ($this->hasMessagesToProcess()) {
85
            // process any messages currently on queue
86
            $this->process();
87
        }
88
89
        self::$intercept[$this->name] = false;
90
91
        return $this;
92
    }
93
94
    /**
95
     * Intercepts any future messages sent to queue.
96
     */
97
    public function intercept(): self
98
    {
99
        self::$intercept[$this->name] = true;
100
101
        return $this;
102
    }
103
104
    public function catchExceptions(): self
105
    {
106
        self::$catchExceptions[$this->name] = true;
107
108
        return $this;
109
    }
110
111
    public function throwExceptions(): self
112
    {
113
        self::$catchExceptions[$this->name] = false;
114
115
        return $this;
116
    }
117
118
    /**
119
     * Processes messages on the queue. This is done recursively so if handling
120
     * a message dispatches more messages, these will be processed as well (up
121
     * to $number).
122
     *
123
     * @param int $number the number of messages to process (-1 for all)
124
     */
125
    public function process(int $number = -1): self
126
    {
127
        $processCount = 0;
128
129
        // keep track of added listeners/subscribers so we can remove after
130
        $listeners = [];
131
132
        $this->dispatcher->addListener(
133
            WorkerRunningEvent::class,
134
            $listeners[WorkerRunningEvent::class] = static function(WorkerRunningEvent $event) use (&$processCount) {
135
                if ($event->isWorkerIdle()) {
136
                    // stop worker if no messages to process
137
                    $event->getWorker()->stop();
138
139
                    return;
140
                }
141
142
                ++$processCount;
143
            }
144
        );
145
146
        if ($number > 0) {
147
            // stop if limit was placed on number to process
148
            $this->dispatcher->addSubscriber($listeners[] = new StopWorkerOnMessageLimitListener($number));
149
        }
150
151
        if (!$this->isCatchingExceptions()) {
152
            $this->dispatcher->addListener(
153
                WorkerMessageFailedEvent::class,
154
                $listeners[WorkerMessageFailedEvent::class] = static function(WorkerMessageFailedEvent $event) {
155
                    throw $event->getThrowable();
156
                }
157
            );
158
        }
159
160
        $worker = new Worker([$this->name => $this], $this->bus, $this->dispatcher);
161
        $worker->run(['sleep' => 0]);
162
163
        // remove added listeners/subscribers
164
        foreach ($listeners as $event => $listener) {
165
            if ($listener instanceof EventSubscriberInterface) {
166
                $this->dispatcher->removeSubscriber($listener);
167
168
                continue;
169
            }
170
171
            $this->dispatcher->removeListener($event, $listener);
172
        }
173
174
        if ($number > 0) {
175
            Assert::that($processCount)->is($number, 'Expected to process {expected} messages but only processed {actual}.');
176
        }
177
178
        return $this;
179
    }
180
181
    /**
182
     * Works the same as {@see process()} but fails if no messages on queue.
183
     */
184
    public function processOrFail(int $number = -1): self
185
    {
186
        Assert::true($this->hasMessagesToProcess(), 'No messages to process.');
187
188
        return $this->process($number);
189
    }
190
191
    public function queue(): EnvelopeCollection
192
    {
193
        return new EnvelopeCollection($this, ...self::$queue[$this->name] ?? []);
194
    }
195
196
    public function dispatched(): EnvelopeCollection
197
    {
198
        return new EnvelopeCollection($this, ...self::$dispatched[$this->name] ?? []);
199
    }
200
201
    public function acknowledged(): EnvelopeCollection
202
    {
203
        return new EnvelopeCollection($this, ...self::$acknowledged[$this->name] ?? []);
204
    }
205
206
    public function rejected(): EnvelopeCollection
207
    {
208
        return new EnvelopeCollection($this, ...self::$rejected[$this->name] ?? []);
209
    }
210
211
    /**
212
     * @internal
213
     */
214
    public function get(): iterable
215
    {
216
        return self::$queue[$this->name] ? [\array_shift(self::$queue[$this->name])] : [];
217
    }
218
219
    /**
220
     * @internal
221
     */
222
    public function ack(Envelope $envelope): void
223
    {
224
        self::$acknowledged[$this->name][] = $envelope;
225
    }
226
227
    /**
228
     * @internal
229
     */
230
    public function reject(Envelope $envelope): void
231
    {
232
        self::$rejected[$this->name][] = $envelope;
233
    }
234
235
    public function send(Envelope $envelope): Envelope
236
    {
237
        if ($this->isRetriesDisabled() && $envelope->last(RedeliveryStamp::class)) {
238
            // message is being retried, don't process
239
            return $envelope;
240
        }
241
242
        if ($this->shouldTestSerialization()) {
243
            Assert::try(
244
                fn() => $this->serializer->decode($this->serializer->encode($envelope)),
245
                'A problem occurred in the serialization process.'
246
            );
247
        }
248
249
        self::$dispatched[$this->name][] = $envelope;
250
        self::$queue[$this->name][] = $envelope;
251
252
        if (!$this->isIntercepting()) {
253
            $this->process();
254
        }
255
256
        return $envelope;
257
    }
258
259
    /**
260
     * Resets all the data for this transport.
261
     */
262
    public function reset(): void
263
    {
264
        self::$queue[$this->name] = self::$dispatched[$this->name] = self::$acknowledged[$this->name] = self::$rejected[$this->name] = [];
265
    }
266
267
    /**
268
     * Resets data and options for all transports.
269
     */
270
    public static function resetAll(): void
271
    {
272
        self::$queue = self::$dispatched = self::$acknowledged = self::$rejected = self::$intercept = self::$catchExceptions = [];
273
    }
274
275
    private function isIntercepting(): bool
276
    {
277
        return self::$intercept[$this->name];
278
    }
279
280
    private function isCatchingExceptions(): bool
281
    {
282
        return self::$catchExceptions[$this->name];
283
    }
284
285
    private function shouldTestSerialization(): bool
286
    {
287
        return self::$testSerialization[$this->name];
288
    }
289
290
    private function isRetriesDisabled(): bool
291
    {
292
        return self::$disableRetries[$this->name];
293
    }
294
295
    private function hasMessagesToProcess(): bool
296
    {
297
        return !empty(self::$queue[$this->name] ?? []);
298
    }
299
}
300