Passed
Pull Request — 1.x (#8)
by Kevin
01:37
created

TestTransport::create()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 4
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Zenstruck\Messenger\Test\Transport;
4
5
use PHPUnit\Framework\Assert as PHPUnit;
6
use Symfony\Component\EventDispatcher\EventDispatcher;
7
use Symfony\Component\Messenger\Envelope;
8
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
9
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
10
use Symfony\Component\Messenger\MessageBusInterface;
11
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
12
use Symfony\Component\Messenger\Transport\TransportInterface;
13
use Symfony\Component\Messenger\Worker;
14
use Zenstruck\Messenger\Test\EnvelopeCollection;
15
16
/**
17
 * @author Kevin Bond <[email protected]>
18
 */
19
final class TestTransport implements TransportInterface
20
{
21
    private const DEFAULT_OPTIONS = [
22
        'intercept' => true,
23
        'catch_exceptions' => true,
24
    ];
25
26
    private string $name;
27
    private MessageBusInterface $bus;
28
    private SerializerInterface $serializer;
29
    private bool $intercept;
30
    private bool $catchExceptions;
31
32
    /** @var array<string, Envelope[]> */
33
    private static array $sent = [];
34
35
    /** @var array<string, Envelope[]> */
36
    private static array $acknowledged = [];
37
38
    /** @var array<string, Envelope[]> */
39
    private static array $rejected = [];
40
41
    /** @var array<string, Envelope[]> */
42
    private static array $queue = [];
43
44
    /** @var array<string, self> */
45
    private static array $transports = [];
46
47
    private function __construct(string $name, MessageBusInterface $bus, SerializerInterface $serializer, array $options = [])
48
    {
49
        $options = \array_merge(self::DEFAULT_OPTIONS, $options);
50
51
        $this->name = $name;
52
        $this->bus = $bus;
53
        $this->serializer = $serializer;
54
        $this->intercept = $options['intercept'];
55
        $this->catchExceptions = $options['catch_exceptions'];
56
    }
57
58
    /**
59
     * @internal
60
     */
61
    public static function create(string $name, MessageBusInterface $bus, SerializerInterface $serializer, array $options = []): self
62
    {
63
        return self::$transports[$name] ??= new self($name, $bus, $serializer, $options);
64
    }
65
66
    /**
67
     * Processes any messages on the queue and processes future messages
68
     * immediately.
69
     */
70
    public function unblock(): self
71
    {
72
        // process any messages currently on queue
73
        $this->process();
74
75
        $this->intercept = false;
76
77
        return $this;
78
    }
79
80
    /**
81
     * Intercepts any future messages sent to queue.
82
     */
83
    public function intercept(): self
84
    {
85
        $this->intercept = true;
86
87
        return $this;
88
    }
89
90
    public function catchExceptions(): self
91
    {
92
        $this->catchExceptions = true;
93
94
        return $this;
95
    }
96
97
    public function throwExceptions(): self
98
    {
99
        $this->catchExceptions = false;
100
101
        return $this;
102
    }
103
104
    /**
105
     * @param int|null $number int: the number of messages on the queue to process
106
     *                         null: process all messages on the queue
107
     */
108
    public function process(?int $number = null): self
109
    {
110
        $count = \count(self::$queue[$this->name] ?? []);
111
112
        if (null === $number) {
113
            return $this->process($count);
114
        }
115
116
        if (0 === $count) {
117
            return $this;
118
        }
119
120
        PHPUnit::assertGreaterThanOrEqual($number, $count, "Tried to process {$number} queued messages but only {$count} are on in the queue.");
121
122
        $eventDispatcher = new EventDispatcher();
123
        $eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($number));
124
125
        if (!$this->catchExceptions) {
126
            $eventDispatcher->addListener(WorkerMessageFailedEvent::class, function(WorkerMessageFailedEvent $event) {
127
                throw $event->getThrowable();
128
            });
129
        }
130
131
        $worker = new Worker([$this], $this->bus, $eventDispatcher);
132
        $worker->run(['sleep' => 0]);
133
134
        return $this;
135
    }
136
137
    public function queue(): EnvelopeCollection
138
    {
139
        return new EnvelopeCollection(...\array_values(self::$queue[$this->name] ?? []));
140
    }
141
142
    public function sent(): EnvelopeCollection
143
    {
144
        return new EnvelopeCollection(...self::$sent[$this->name] ?? []);
145
    }
146
147
    public function acknowledged(): EnvelopeCollection
148
    {
149
        return new EnvelopeCollection(...self::$acknowledged[$this->name] ?? []);
150
    }
151
152
    public function rejected(): EnvelopeCollection
153
    {
154
        return new EnvelopeCollection(...self::$rejected[$this->name] ?? []);
155
    }
156
157
    /**
158
     * @internal
159
     */
160
    public function get(): iterable
161
    {
162
        return \array_values(self::$queue[$this->name] ?? []);
163
    }
164
165
    /**
166
     * @internal
167
     */
168
    public function ack(Envelope $envelope): void
169
    {
170
        self::$acknowledged[$this->name][] = $envelope;
171
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
172
    }
173
174
    /**
175
     * @internal
176
     */
177
    public function reject(Envelope $envelope): void
178
    {
179
        self::$rejected[$this->name][] = $envelope;
180
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
181
    }
182
183
    /**
184
     * @internal
185
     */
186
    public function send(Envelope $envelope): Envelope
187
    {
188
        // ensure serialization works (todo configurable? better error on failure?)
189
        $this->serializer->decode($this->serializer->encode($envelope));
190
191
        self::$sent[$this->name][] = $envelope;
192
        self::$queue[$this->name][\spl_object_hash($envelope->getMessage())] = $envelope;
193
194
        if (!$this->intercept) {
195
            $this->process();
196
        }
197
198
        return $envelope;
199
    }
200
201
    public static function reset(): void
202
    {
203
        self::$transports = self::$queue = self::$sent = self::$acknowledged = self::$rejected = [];
204
    }
205
}
206