Passed
Push — 1.x ( 9b0f6c...854901 )
by Kevin
01:50
created

TestTransport::hasMessagesToProcess()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 3
rs 10
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Transport;
4
5
use PHPUnit\Framework\Assert as PHPUnit;
6
use Symfony\Component\EventDispatcher\EventDispatcher;
7
use Symfony\Component\Messenger\Envelope;
8
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
9
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
10
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
11
use Symfony\Component\Messenger\MessageBusInterface;
12
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
13
use Symfony\Component\Messenger\Transport\TransportInterface;
14
use Symfony\Component\Messenger\Worker;
15
use Zenstruck\Messenger\Test\EnvelopeCollection;
16
17
/**
18
 * @author Kevin Bond <[email protected]>
19
 */
20
final class TestTransport implements TransportInterface
21
{
22
    private const DEFAULT_OPTIONS = [
23
        'intercept' => true,
24
        'catch_exceptions' => true,
25
    ];
26
27
    private string $name;
28
    private MessageBusInterface $bus;
29
    private SerializerInterface $serializer;
30
31
    /** @var array<string, bool> */
32
    private static array $intercept = [];
33
34
    /** @var array<string, bool> */
35
    private static array $catchExceptions = [];
36
37
    /** @var array<string, Envelope[]> */
38
    private static array $sent = [];
39
40
    /** @var array<string, Envelope[]> */
41
    private static array $acknowledged = [];
42
43
    /** @var array<string, Envelope[]> */
44
    private static array $rejected = [];
45
46
    /** @var array<string, Envelope[]> */
47
    private static array $queue = [];
48
49
    /**
50
     * @internal
51
     */
52
    public function __construct(string $name, MessageBusInterface $bus, SerializerInterface $serializer, array $options = [])
53
    {
54
        $options = \array_merge(self::DEFAULT_OPTIONS, $options);
55
56
        $this->name = $name;
57
        $this->bus = $bus;
58
        $this->serializer = $serializer;
59
60
        self::$intercept[$name] ??= $options['intercept'];
61
        self::$catchExceptions[$name] ??= $options['catch_exceptions'];
62
    }
63
64
    /**
65
     * Processes any messages on the queue and processes future messages
66
     * immediately.
67
     */
68
    public function unblock(): self
69
    {
70
        if ($this->hasMessagesToProcess()) {
71
            // process any messages currently on queue
72
            $this->process();
73
        }
74
75
        self::$intercept[$this->name] = false;
76
77
        return $this;
78
    }
79
80
    /**
81
     * Intercepts any future messages sent to queue.
82
     */
83
    public function intercept(): self
84
    {
85
        self::$intercept[$this->name] = true;
86
87
        return $this;
88
    }
89
90
    public function catchExceptions(): self
91
    {
92
        self::$catchExceptions[$this->name] = true;
93
94
        return $this;
95
    }
96
97
    public function throwExceptions(): self
98
    {
99
        self::$catchExceptions[$this->name] = false;
100
101
        return $this;
102
    }
103
104
    /**
105
     * Processes messages on the queue. This is done recursively so if handling
106
     * a message dispatches more messages, these will be processed as well (up
107
     * to $number).
108
     *
109
     * @param int $number the number of messages to process (-1 for all)
110
     */
111
    public function process(int $number = -1): self
112
    {
113
        PHPUnit::assertTrue($this->hasMessagesToProcess(), 'No messages to process.');
114
115
        $eventDispatcher = new EventDispatcher();
116
        $processCount = 0;
117
118
        $eventDispatcher->addListener(
119
            WorkerRunningEvent::class,
120
            static function(WorkerRunningEvent $event) use (&$processCount) {
121
                if ($event->isWorkerIdle()) {
122
                    // stop worker if no messages to process
123
                    $event->getWorker()->stop();
124
125
                    return;
126
                }
127
128
                ++$processCount;
129
            }
130
        );
131
132
        if ($number > 0) {
133
            // stop if limit was placed on number to process
134
            $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($number));
135
        }
136
137
        if (!$this->isCatchingExceptions()) {
138
            $eventDispatcher->addListener(WorkerMessageFailedEvent::class, function(WorkerMessageFailedEvent $event) {
139
                throw $event->getThrowable();
140
            });
141
        }
142
143
        $worker = new Worker([$this], $this->bus, $eventDispatcher);
144
        $worker->run(['sleep' => 0]);
145
146
        if ($number > 0) {
147
            PHPUnit::assertSame($number, $processCount, "Expected to process {$number} messages but only {$processCount} was processed.");
148
        }
149
150
        return $this;
151
    }
152
153
    public function queue(): EnvelopeCollection
154
    {
155
        return new EnvelopeCollection(...\array_values(self::$queue[$this->name] ?? []));
156
    }
157
158
    public function dispatched(): EnvelopeCollection
159
    {
160
        return new EnvelopeCollection(...self::$sent[$this->name] ?? []);
161
    }
162
163
    public function acknowledged(): EnvelopeCollection
164
    {
165
        return new EnvelopeCollection(...self::$acknowledged[$this->name] ?? []);
166
    }
167
168
    public function rejected(): EnvelopeCollection
169
    {
170
        return new EnvelopeCollection(...self::$rejected[$this->name] ?? []);
171
    }
172
173
    /**
174
     * @internal
175
     */
176
    public function get(): iterable
177
    {
178
        return \array_values(self::$queue[$this->name] ?? []);
179
    }
180
181
    /**
182
     * @internal
183
     */
184
    public function ack(Envelope $envelope): void
185
    {
186
        self::$acknowledged[$this->name][] = $envelope;
187
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
188
    }
189
190
    /**
191
     * @internal
192
     */
193
    public function reject(Envelope $envelope): void
194
    {
195
        self::$rejected[$this->name][] = $envelope;
196
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
197
    }
198
199
    public function send(Envelope $envelope): Envelope
200
    {
201
        // ensure serialization works (todo configurable? better error on failure?)
202
        $this->serializer->decode($this->serializer->encode($envelope));
203
204
        self::$sent[$this->name][] = $envelope;
205
        self::$queue[$this->name][\spl_object_hash($envelope->getMessage())] = $envelope;
206
207
        if (!$this->isIntercepting()) {
208
            $this->process();
209
        }
210
211
        return $envelope;
212
    }
213
214
    /**
215
     * Resets all the data for this transport.
216
     */
217
    public function reset(): void
218
    {
219
        self::$queue[$this->name] = self::$sent[$this->name] = self::$acknowledged[$this->name] = self::$rejected[$this->name] = [];
220
    }
221
222
    /**
223
     * Resets data and options for all transports.
224
     */
225
    public static function resetAll(): void
226
    {
227
        self::$queue = self::$sent = self::$acknowledged = self::$rejected = self::$intercept = self::$catchExceptions = [];
228
    }
229
230
    private function isIntercepting(): bool
231
    {
232
        return self::$intercept[$this->name];
233
    }
234
235
    private function isCatchingExceptions(): bool
236
    {
237
        return self::$catchExceptions[$this->name];
238
    }
239
240
    private function hasMessagesToProcess(): bool
241
    {
242
        return !empty(self::$queue[$this->name] ?? []);
243
    }
244
}
245