Passed
Push — 1.x ( 774941...781dbf )
by Kevin
03:22 queued 01:44
created

TestTransport::resetAll()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Transport;
4
5
use PHPUnit\Framework\Assert as PHPUnit;
6
use Symfony\Component\EventDispatcher\EventDispatcher;
7
use Symfony\Component\Messenger\Envelope;
8
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
9
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
10
use Symfony\Component\Messenger\MessageBusInterface;
11
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
12
use Symfony\Component\Messenger\Transport\TransportInterface;
13
use Symfony\Component\Messenger\Worker;
14
use Zenstruck\Messenger\Test\EnvelopeCollection;
15
16
/**
17
 * @author Kevin Bond <[email protected]>
18
 */
19
final class TestTransport implements TransportInterface
20
{
21
    private const DEFAULT_OPTIONS = [
22
        'intercept' => true,
23
        'catch_exceptions' => true,
24
    ];
25
26
    private string $name;
27
    private MessageBusInterface $bus;
28
    private SerializerInterface $serializer;
29
30
    /** @var array<string, bool> */
31
    private static array $intercept = [];
32
33
    /** @var array<string, bool> */
34
    private static array $catchExceptions = [];
35
36
    /** @var array<string, Envelope[]> */
37
    private static array $sent = [];
38
39
    /** @var array<string, Envelope[]> */
40
    private static array $acknowledged = [];
41
42
    /** @var array<string, Envelope[]> */
43
    private static array $rejected = [];
44
45
    /** @var array<string, Envelope[]> */
46
    private static array $queue = [];
47
48
    /**
49
     * @internal
50
     */
51
    public function __construct(string $name, MessageBusInterface $bus, SerializerInterface $serializer, array $options = [])
52
    {
53
        $options = \array_merge(self::DEFAULT_OPTIONS, $options);
54
55
        $this->name = $name;
56
        $this->bus = $bus;
57
        $this->serializer = $serializer;
58
59
        self::$intercept[$name] ??= $options['intercept'];
60
        self::$catchExceptions[$name] ??= $options['catch_exceptions'];
61
    }
62
63
    /**
64
     * Processes any messages on the queue and processes future messages
65
     * immediately.
66
     */
67
    public function unblock(): self
68
    {
69
        // process any messages currently on queue
70
        $this->process();
71
72
        self::$intercept[$this->name] = false;
73
74
        return $this;
75
    }
76
77
    /**
78
     * Intercepts any future messages sent to queue.
79
     */
80
    public function intercept(): self
81
    {
82
        self::$intercept[$this->name] = true;
83
84
        return $this;
85
    }
86
87
    public function catchExceptions(): self
88
    {
89
        self::$catchExceptions[$this->name] = true;
90
91
        return $this;
92
    }
93
94
    public function throwExceptions(): self
95
    {
96
        self::$catchExceptions[$this->name] = false;
97
98
        return $this;
99
    }
100
101
    /**
102
     * @param int|null $number int: the number of messages on the queue to process
103
     *                         null: process all messages on the queue
104
     */
105
    public function process(?int $number = null): self
106
    {
107
        $count = \count(self::$queue[$this->name] ?? []);
108
109
        if (null === $number) {
110
            return $this->process($count);
111
        }
112
113
        if (0 === $count) {
114
            return $this;
115
        }
116
117
        PHPUnit::assertGreaterThanOrEqual($number, $count, "Tried to process {$number} queued messages but only {$count} are on in the queue.");
118
119
        $eventDispatcher = new EventDispatcher();
120
        $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($number));
121
122
        if (!$this->isCatchingExceptions()) {
123
            $eventDispatcher->addListener(WorkerMessageFailedEvent::class, function(WorkerMessageFailedEvent $event) {
124
                throw $event->getThrowable();
125
            });
126
        }
127
128
        $worker = new Worker([$this], $this->bus, $eventDispatcher);
129
        $worker->run(['sleep' => 0]);
130
131
        return $this;
132
    }
133
134
    public function queue(): EnvelopeCollection
135
    {
136
        return new EnvelopeCollection(...\array_values(self::$queue[$this->name] ?? []));
137
    }
138
139
    public function sent(): EnvelopeCollection
140
    {
141
        return new EnvelopeCollection(...self::$sent[$this->name] ?? []);
142
    }
143
144
    public function acknowledged(): EnvelopeCollection
145
    {
146
        return new EnvelopeCollection(...self::$acknowledged[$this->name] ?? []);
147
    }
148
149
    public function rejected(): EnvelopeCollection
150
    {
151
        return new EnvelopeCollection(...self::$rejected[$this->name] ?? []);
152
    }
153
154
    /**
155
     * @internal
156
     */
157
    public function get(): iterable
158
    {
159
        return \array_values(self::$queue[$this->name] ?? []);
160
    }
161
162
    /**
163
     * @internal
164
     */
165
    public function ack(Envelope $envelope): void
166
    {
167
        self::$acknowledged[$this->name][] = $envelope;
168
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
169
    }
170
171
    /**
172
     * @internal
173
     */
174
    public function reject(Envelope $envelope): void
175
    {
176
        self::$rejected[$this->name][] = $envelope;
177
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
178
    }
179
180
    /**
181
     * @internal
182
     */
183
    public function send(Envelope $envelope): Envelope
184
    {
185
        // ensure serialization works (todo configurable? better error on failure?)
186
        $this->serializer->decode($this->serializer->encode($envelope));
187
188
        self::$sent[$this->name][] = $envelope;
189
        self::$queue[$this->name][\spl_object_hash($envelope->getMessage())] = $envelope;
190
191
        if (!$this->isIntercepting()) {
192
            $this->process();
193
        }
194
195
        return $envelope;
196
    }
197
198
    /**
199
     * Resets all the data for this transport.
200
     */
201
    public function reset(): void
202
    {
203
        self::$queue[$this->name] = self::$sent[$this->name] = self::$acknowledged[$this->name] = self::$rejected[$this->name] = [];
204
    }
205
206
    /**
207
     * Resets data and options for all transports.
208
     */
209
    public static function resetAll(): void
210
    {
211
        self::$queue = self::$sent = self::$acknowledged = self::$rejected = self::$intercept = self::$catchExceptions = [];
212
    }
213
214
    private function isIntercepting(): bool
215
    {
216
        return self::$intercept[$this->name];
217
    }
218
219
    private function isCatchingExceptions(): bool
220
    {
221
        return self::$catchExceptions[$this->name];
222
    }
223
}
224