Passed
Pull Request — 1.x (#17)
by Kevin
01:46
created

TestTransport::hasMessagesToProcess()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 3
rs 10
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Transport;
4
5
use PHPUnit\Framework\Assert as PHPUnit;
6
use Symfony\Component\EventDispatcher\EventDispatcher;
7
use Symfony\Component\Messenger\Envelope;
8
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
9
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
10
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
11
use Symfony\Component\Messenger\MessageBusInterface;
12
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
13
use Symfony\Component\Messenger\Transport\TransportInterface;
14
use Symfony\Component\Messenger\Worker;
15
use Zenstruck\Messenger\Test\EnvelopeCollection;
16
17
/**
18
 * @author Kevin Bond <[email protected]>
19
 */
20
final class TestTransport implements TransportInterface
21
{
22
    private const DEFAULT_OPTIONS = [
23
        'intercept' => true,
24
        'catch_exceptions' => true,
25
    ];
26
27
    private string $name;
28
    private MessageBusInterface $bus;
29
    private SerializerInterface $serializer;
30
31
    /** @var array<string, bool> */
32
    private static array $intercept = [];
33
34
    /** @var array<string, bool> */
35
    private static array $catchExceptions = [];
36
37
    /** @var array<string, Envelope[]> */
38
    private static array $sent = [];
39
40
    /** @var array<string, Envelope[]> */
41
    private static array $acknowledged = [];
42
43
    /** @var array<string, Envelope[]> */
44
    private static array $rejected = [];
45
46
    /** @var array<string, Envelope[]> */
47
    private static array $queue = [];
48
49
    /**
50
     * @internal
51
     */
52
    public function __construct(string $name, MessageBusInterface $bus, SerializerInterface $serializer, array $options = [])
53
    {
54
        $options = \array_merge(self::DEFAULT_OPTIONS, $options);
55
56
        $this->name = $name;
57
        $this->bus = $bus;
58
        $this->serializer = $serializer;
59
60
        self::$intercept[$name] ??= $options['intercept'];
61
        self::$catchExceptions[$name] ??= $options['catch_exceptions'];
62
    }
63
64
    /**
65
     * Processes any messages on the queue and processes future messages
66
     * immediately.
67
     */
68
    public function unblock(): self
69
    {
70
        if ($this->hasMessagesToProcess()) {
71
            // process any messages currently on queue
72
            $this->process();
73
        }
74
75
        self::$intercept[$this->name] = false;
76
77
        return $this;
78
    }
79
80
    /**
81
     * Intercepts any future messages sent to queue.
82
     */
83
    public function intercept(): self
84
    {
85
        self::$intercept[$this->name] = true;
86
87
        return $this;
88
    }
89
90
    public function catchExceptions(): self
91
    {
92
        self::$catchExceptions[$this->name] = true;
93
94
        return $this;
95
    }
96
97
    public function throwExceptions(): self
98
    {
99
        self::$catchExceptions[$this->name] = false;
100
101
        return $this;
102
    }
103
104
    /**
105
     * Processes messages on the queue. This is done recursively so if handling
106
     * a message dispatches more messages, these will be processed as well (up
107
     * to $number).
108
     *
109
     * @param int $number the number of messages to process (-1 for all)
110
     */
111
    public function process(int $number = -1): self
112
    {
113
        PHPUnit::assertTrue($this->hasMessagesToProcess(), 'No messages to process.');
114
115
        $eventDispatcher = new EventDispatcher();
116
        $proccessCount = 0;
117
118
        $eventDispatcher->addListener(WorkerRunningEvent::class, function(WorkerRunningEvent $event) use (&$proccessCount) {
119
            if ($event->isWorkerIdle()) {
120
                // stop worker if no messages to process
121
                $event->getWorker()->stop();
122
123
                return;
124
            }
125
126
            ++$proccessCount;
127
        });
128
129
        if ($number > 0) {
130
            // stop if limit was placed on number to process
131
            $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($number));
132
        }
133
134
        if (!$this->isCatchingExceptions()) {
135
            $eventDispatcher->addListener(WorkerMessageFailedEvent::class, function(WorkerMessageFailedEvent $event) {
136
                throw $event->getThrowable();
137
            });
138
        }
139
140
        $worker = new Worker([$this], $this->bus, $eventDispatcher);
141
        $worker->run(['sleep' => 0]);
142
143
        if ($number > 0) {
144
            PHPUnit::assertSame($number, $proccessCount, "Expected to process {$number} messages but only {$proccessCount} was processed.");
145
        }
146
147
        return $this;
148
    }
149
150
    public function queue(): EnvelopeCollection
151
    {
152
        return new EnvelopeCollection(...\array_values(self::$queue[$this->name] ?? []));
153
    }
154
155
    public function dispatched(): EnvelopeCollection
156
    {
157
        return new EnvelopeCollection(...self::$sent[$this->name] ?? []);
158
    }
159
160
    public function acknowledged(): EnvelopeCollection
161
    {
162
        return new EnvelopeCollection(...self::$acknowledged[$this->name] ?? []);
163
    }
164
165
    public function rejected(): EnvelopeCollection
166
    {
167
        return new EnvelopeCollection(...self::$rejected[$this->name] ?? []);
168
    }
169
170
    /**
171
     * @internal
172
     */
173
    public function get(): iterable
174
    {
175
        return \array_values(self::$queue[$this->name] ?? []);
176
    }
177
178
    /**
179
     * @internal
180
     */
181
    public function ack(Envelope $envelope): void
182
    {
183
        self::$acknowledged[$this->name][] = $envelope;
184
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
185
    }
186
187
    /**
188
     * @internal
189
     */
190
    public function reject(Envelope $envelope): void
191
    {
192
        self::$rejected[$this->name][] = $envelope;
193
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
194
    }
195
196
    public function send(Envelope $envelope): Envelope
197
    {
198
        // ensure serialization works (todo configurable? better error on failure?)
199
        $this->serializer->decode($this->serializer->encode($envelope));
200
201
        self::$sent[$this->name][] = $envelope;
202
        self::$queue[$this->name][\spl_object_hash($envelope->getMessage())] = $envelope;
203
204
        if (!$this->isIntercepting()) {
205
            $this->process();
206
        }
207
208
        return $envelope;
209
    }
210
211
    /**
212
     * Resets all the data for this transport.
213
     */
214
    public function reset(): void
215
    {
216
        self::$queue[$this->name] = self::$sent[$this->name] = self::$acknowledged[$this->name] = self::$rejected[$this->name] = [];
217
    }
218
219
    /**
220
     * Resets data and options for all transports.
221
     */
222
    public static function resetAll(): void
223
    {
224
        self::$queue = self::$sent = self::$acknowledged = self::$rejected = self::$intercept = self::$catchExceptions = [];
225
    }
226
227
    private function isIntercepting(): bool
228
    {
229
        return self::$intercept[$this->name];
230
    }
231
232
    private function isCatchingExceptions(): bool
233
    {
234
        return self::$catchExceptions[$this->name];
235
    }
236
237
    private function hasMessagesToProcess(): bool
238
    {
239
        return !empty(self::$queue[$this->name] ?? []);
240
    }
241
}
242