Passed
Push — 1.x ( cf2ec7...97fb25 )
by Kevin
01:29
created

TestTransport   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 261
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 31
eloc 84
c 2
b 0
f 0
dl 0
loc 261
rs 9.92

21 Methods

Rating   Name   Duplication   Size   Complexity  
A catchExceptions() 0 5 1
A processOrFail() 0 5 1
B process() 0 54 7
A throwExceptions() 0 5 1
A __construct() 0 12 1
A acknowledged() 0 3 1
A queue() 0 3 1
A unblock() 0 10 2
A intercept() 0 5 1
A rejected() 0 3 1
A dispatched() 0 3 1
A send() 0 17 3
A hasMessagesToProcess() 0 3 1
A ack() 0 3 1
A reset() 0 3 1
A get() 0 3 2
A resetAll() 0 3 1
A isIntercepting() 0 3 1
A reject() 0 3 1
A shouldTestSerialization() 0 3 1
A isCatchingExceptions() 0 3 1
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Transport;
4
5
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
6
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
7
use Symfony\Component\Messenger\Envelope;
8
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
9
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
10
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
11
use Symfony\Component\Messenger\MessageBusInterface;
12
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
13
use Symfony\Component\Messenger\Transport\TransportInterface;
14
use Symfony\Component\Messenger\Worker;
15
use Zenstruck\Assert;
16
use Zenstruck\Messenger\Test\EnvelopeCollection;
17
18
/**
19
 * @author Kevin Bond <[email protected]>
20
 */
21
final class TestTransport implements TransportInterface
22
{
23
    private const DEFAULT_OPTIONS = [
24
        'intercept' => true,
25
        'catch_exceptions' => true,
26
        'test_serialization' => true,
27
    ];
28
29
    private string $name;
30
    private EventDispatcherInterface $dispatcher;
31
    private MessageBusInterface $bus;
32
    private SerializerInterface $serializer;
33
34
    /** @var array<string, bool> */
35
    private static array $intercept = [];
36
37
    /** @var array<string, bool> */
38
    private static array $catchExceptions = [];
39
40
    /** @var array<string, bool> */
41
    private static array $testSerialization = [];
42
43
    /** @var array<string, Envelope[]> */
44
    private static array $dispatched = [];
45
46
    /** @var array<string, Envelope[]> */
47
    private static array $acknowledged = [];
48
49
    /** @var array<string, Envelope[]> */
50
    private static array $rejected = [];
51
52
    /** @var array<string, Envelope[]> */
53
    private static array $queue = [];
54
55
    /**
56
     * @internal
57
     */
58
    public function __construct(string $name, MessageBusInterface $bus, EventDispatcherInterface $dispatcher, SerializerInterface $serializer, array $options = [])
59
    {
60
        $options = \array_merge(self::DEFAULT_OPTIONS, $options);
61
62
        $this->name = $name;
63
        $this->dispatcher = $dispatcher;
64
        $this->bus = $bus;
65
        $this->serializer = $serializer;
66
67
        self::$intercept[$name] ??= $options['intercept'];
68
        self::$catchExceptions[$name] ??= $options['catch_exceptions'];
69
        self::$testSerialization[$name] ??= $options['test_serialization'];
70
    }
71
72
    /**
73
     * Processes any messages on the queue and processes future messages
74
     * immediately.
75
     */
76
    public function unblock(): self
77
    {
78
        if ($this->hasMessagesToProcess()) {
79
            // process any messages currently on queue
80
            $this->process();
81
        }
82
83
        self::$intercept[$this->name] = false;
84
85
        return $this;
86
    }
87
88
    /**
89
     * Intercepts any future messages sent to queue.
90
     */
91
    public function intercept(): self
92
    {
93
        self::$intercept[$this->name] = true;
94
95
        return $this;
96
    }
97
98
    public function catchExceptions(): self
99
    {
100
        self::$catchExceptions[$this->name] = true;
101
102
        return $this;
103
    }
104
105
    public function throwExceptions(): self
106
    {
107
        self::$catchExceptions[$this->name] = false;
108
109
        return $this;
110
    }
111
112
    /**
113
     * Processes messages on the queue. This is done recursively so if handling
114
     * a message dispatches more messages, these will be processed as well (up
115
     * to $number).
116
     *
117
     * @param int $number the number of messages to process (-1 for all)
118
     */
119
    public function process(int $number = -1): self
120
    {
121
        $processCount = 0;
122
123
        // keep track of added listeners/subscribers so we can remove after
124
        $listeners = [];
125
126
        $this->dispatcher->addListener(
127
            WorkerRunningEvent::class,
128
            $listeners[WorkerRunningEvent::class] = static function(WorkerRunningEvent $event) use (&$processCount) {
129
                if ($event->isWorkerIdle()) {
130
                    // stop worker if no messages to process
131
                    $event->getWorker()->stop();
132
133
                    return;
134
                }
135
136
                ++$processCount;
137
            }
138
        );
139
140
        if ($number > 0) {
141
            // stop if limit was placed on number to process
142
            $this->dispatcher->addSubscriber($listeners[] = new StopWorkerOnMessageLimitListener($number));
143
        }
144
145
        if (!$this->isCatchingExceptions()) {
146
            $this->dispatcher->addListener(
147
                WorkerMessageFailedEvent::class,
148
                $listeners[WorkerMessageFailedEvent::class] = static function(WorkerMessageFailedEvent $event) {
149
                    throw $event->getThrowable();
150
                }
151
            );
152
        }
153
154
        $worker = new Worker([$this], $this->bus, $this->dispatcher);
155
        $worker->run(['sleep' => 0]);
156
157
        // remove added listeners/subscribers
158
        foreach ($listeners as $event => $listener) {
159
            if ($listener instanceof EventSubscriberInterface) {
160
                $this->dispatcher->removeSubscriber($listener);
161
162
                continue;
163
            }
164
165
            $this->dispatcher->removeListener($event, $listener);
166
        }
167
168
        if ($number > 0) {
169
            Assert::that($processCount)->is($number, 'Expected to process {expected} messages but only processed {actual}.');
170
        }
171
172
        return $this;
173
    }
174
175
    /**
176
     * Works the same as {@see process()} but fails if no messages on queue.
177
     */
178
    public function processOrFail(int $number = -1): self
179
    {
180
        Assert::true($this->hasMessagesToProcess(), 'No messages to process.');
181
182
        return $this->process($number);
183
    }
184
185
    public function queue(): EnvelopeCollection
186
    {
187
        return new EnvelopeCollection($this, ...self::$queue[$this->name] ?? []);
188
    }
189
190
    public function dispatched(): EnvelopeCollection
191
    {
192
        return new EnvelopeCollection($this, ...self::$dispatched[$this->name] ?? []);
193
    }
194
195
    public function acknowledged(): EnvelopeCollection
196
    {
197
        return new EnvelopeCollection($this, ...self::$acknowledged[$this->name] ?? []);
198
    }
199
200
    public function rejected(): EnvelopeCollection
201
    {
202
        return new EnvelopeCollection($this, ...self::$rejected[$this->name] ?? []);
203
    }
204
205
    /**
206
     * @internal
207
     */
208
    public function get(): iterable
209
    {
210
        return self::$queue[$this->name] ? [\array_shift(self::$queue[$this->name])] : [];
211
    }
212
213
    /**
214
     * @internal
215
     */
216
    public function ack(Envelope $envelope): void
217
    {
218
        self::$acknowledged[$this->name][] = $envelope;
219
    }
220
221
    /**
222
     * @internal
223
     */
224
    public function reject(Envelope $envelope): void
225
    {
226
        self::$rejected[$this->name][] = $envelope;
227
    }
228
229
    public function send(Envelope $envelope): Envelope
230
    {
231
        if ($this->shouldTestSerialization()) {
232
            Assert::try(
233
                fn() => $this->serializer->decode($this->serializer->encode($envelope)),
234
                'A problem occurred in the serialization process.'
235
            );
236
        }
237
238
        self::$dispatched[$this->name][] = $envelope;
239
        self::$queue[$this->name][] = $envelope;
240
241
        if (!$this->isIntercepting()) {
242
            $this->process();
243
        }
244
245
        return $envelope;
246
    }
247
248
    /**
249
     * Resets all the data for this transport.
250
     */
251
    public function reset(): void
252
    {
253
        self::$queue[$this->name] = self::$dispatched[$this->name] = self::$acknowledged[$this->name] = self::$rejected[$this->name] = [];
254
    }
255
256
    /**
257
     * Resets data and options for all transports.
258
     */
259
    public static function resetAll(): void
260
    {
261
        self::$queue = self::$dispatched = self::$acknowledged = self::$rejected = self::$intercept = self::$catchExceptions = [];
262
    }
263
264
    private function isIntercepting(): bool
265
    {
266
        return self::$intercept[$this->name];
267
    }
268
269
    private function isCatchingExceptions(): bool
270
    {
271
        return self::$catchExceptions[$this->name];
272
    }
273
274
    private function shouldTestSerialization(): bool
275
    {
276
        return self::$testSerialization[$this->name];
277
    }
278
279
    private function hasMessagesToProcess(): bool
280
    {
281
        return !empty(self::$queue[$this->name] ?? []);
282
    }
283
}
284