Passed
Push — 1.x ( cc2d2c...e54feb )
by Kevin
01:40
created

TestTransport::process()   B

Complexity

Conditions 7
Paths 24

Size

Total Lines 54
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
eloc 26
c 1
b 0
f 0
nc 24
nop 1
dl 0
loc 54
rs 8.5706

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Zenstruck\Messenger\Test\Transport;
4
5
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
6
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
7
use Symfony\Component\Messenger\Envelope;
8
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
9
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
10
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
11
use Symfony\Component\Messenger\MessageBusInterface;
12
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
13
use Symfony\Component\Messenger\Transport\TransportInterface;
14
use Symfony\Component\Messenger\Worker;
15
use Zenstruck\Assert;
16
use Zenstruck\Messenger\Test\EnvelopeCollection;
17
18
/**
19
 * @author Kevin Bond <[email protected]>
20
 */
21
final class TestTransport implements TransportInterface
22
{
23
    private const DEFAULT_OPTIONS = [
24
        'intercept' => true,
25
        'catch_exceptions' => true,
26
    ];
27
28
    private string $name;
29
    private EventDispatcherInterface $dispatcher;
30
    private MessageBusInterface $bus;
31
    private SerializerInterface $serializer;
32
33
    /** @var array<string, bool> */
34
    private static array $intercept = [];
35
36
    /** @var array<string, bool> */
37
    private static array $catchExceptions = [];
38
39
    /** @var array<string, Envelope[]> */
40
    private static array $dispatched = [];
41
42
    /** @var array<string, Envelope[]> */
43
    private static array $acknowledged = [];
44
45
    /** @var array<string, Envelope[]> */
46
    private static array $rejected = [];
47
48
    /** @var array<string, Envelope[]> */
49
    private static array $queue = [];
50
51
    /**
52
     * @internal
53
     */
54
    public function __construct(string $name, MessageBusInterface $bus, EventDispatcherInterface $dispatcher, SerializerInterface $serializer, array $options = [])
55
    {
56
        $options = \array_merge(self::DEFAULT_OPTIONS, $options);
57
58
        $this->name = $name;
59
        $this->dispatcher = $dispatcher;
60
        $this->bus = $bus;
61
        $this->serializer = $serializer;
62
63
        self::$intercept[$name] ??= $options['intercept'];
64
        self::$catchExceptions[$name] ??= $options['catch_exceptions'];
65
    }
66
67
    /**
68
     * Processes any messages on the queue and processes future messages
69
     * immediately.
70
     */
71
    public function unblock(): self
72
    {
73
        if ($this->hasMessagesToProcess()) {
74
            // process any messages currently on queue
75
            $this->process();
76
        }
77
78
        self::$intercept[$this->name] = false;
79
80
        return $this;
81
    }
82
83
    /**
84
     * Intercepts any future messages sent to queue.
85
     */
86
    public function intercept(): self
87
    {
88
        self::$intercept[$this->name] = true;
89
90
        return $this;
91
    }
92
93
    public function catchExceptions(): self
94
    {
95
        self::$catchExceptions[$this->name] = true;
96
97
        return $this;
98
    }
99
100
    public function throwExceptions(): self
101
    {
102
        self::$catchExceptions[$this->name] = false;
103
104
        return $this;
105
    }
106
107
    /**
108
     * Processes messages on the queue. This is done recursively so if handling
109
     * a message dispatches more messages, these will be processed as well (up
110
     * to $number).
111
     *
112
     * @param int $number the number of messages to process (-1 for all)
113
     */
114
    public function process(int $number = -1): self
115
    {
116
        $processCount = 0;
117
118
        // keep track of added listeners/subscribers so we can remove after
119
        $listeners = [];
120
121
        $this->dispatcher->addListener(
122
            WorkerRunningEvent::class,
123
            $listeners[WorkerRunningEvent::class] = static function(WorkerRunningEvent $event) use (&$processCount) {
124
                if ($event->isWorkerIdle()) {
125
                    // stop worker if no messages to process
126
                    $event->getWorker()->stop();
127
128
                    return;
129
                }
130
131
                ++$processCount;
132
            }
133
        );
134
135
        if ($number > 0) {
136
            // stop if limit was placed on number to process
137
            $this->dispatcher->addSubscriber($listeners[] = new StopWorkerOnMessageLimitListener($number));
138
        }
139
140
        if (!$this->isCatchingExceptions()) {
141
            $this->dispatcher->addListener(
142
                WorkerMessageFailedEvent::class,
143
                $listeners[WorkerMessageFailedEvent::class] = static function(WorkerMessageFailedEvent $event) {
144
                    throw $event->getThrowable();
145
                }
146
            );
147
        }
148
149
        $worker = new Worker([$this], $this->bus, $this->dispatcher);
150
        $worker->run(['sleep' => 0]);
151
152
        // remove added listeners/subscribers
153
        foreach ($listeners as $event => $listener) {
154
            if ($listener instanceof EventSubscriberInterface) {
155
                $this->dispatcher->removeSubscriber($listener);
156
157
                continue;
158
            }
159
160
            $this->dispatcher->removeListener($event, $listener);
161
        }
162
163
        if ($number > 0) {
164
            Assert::that($processCount)->is($number, 'Expected to process {expected} messages but only processed {actual}.');
165
        }
166
167
        return $this;
168
    }
169
170
    /**
171
     * Works the same as {@see process()} but fails if no messages on queue.
172
     */
173
    public function processOrFail(int $number = -1): self
174
    {
175
        Assert::true($this->hasMessagesToProcess(), 'No messages to process.');
176
177
        return $this->process($number);
178
    }
179
180
    public function queue(): EnvelopeCollection
181
    {
182
        return new EnvelopeCollection(...\array_values(self::$queue[$this->name] ?? []));
183
    }
184
185
    public function dispatched(): EnvelopeCollection
186
    {
187
        return new EnvelopeCollection(...self::$dispatched[$this->name] ?? []);
188
    }
189
190
    public function acknowledged(): EnvelopeCollection
191
    {
192
        return new EnvelopeCollection(...self::$acknowledged[$this->name] ?? []);
193
    }
194
195
    public function rejected(): EnvelopeCollection
196
    {
197
        return new EnvelopeCollection(...self::$rejected[$this->name] ?? []);
198
    }
199
200
    /**
201
     * @internal
202
     */
203
    public function get(): iterable
204
    {
205
        return \array_values(self::$queue[$this->name] ?? []);
206
    }
207
208
    /**
209
     * @internal
210
     */
211
    public function ack(Envelope $envelope): void
212
    {
213
        self::$acknowledged[$this->name][] = $envelope;
214
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
215
    }
216
217
    /**
218
     * @internal
219
     */
220
    public function reject(Envelope $envelope): void
221
    {
222
        self::$rejected[$this->name][] = $envelope;
223
        unset(self::$queue[$this->name][\spl_object_hash($envelope->getMessage())]);
224
    }
225
226
    public function send(Envelope $envelope): Envelope
227
    {
228
        // ensure serialization works (todo configurable? better error on failure?)
229
        $this->serializer->decode($this->serializer->encode($envelope));
230
231
        self::$dispatched[$this->name][] = $envelope;
232
        self::$queue[$this->name][\spl_object_hash($envelope->getMessage())] = $envelope;
233
234
        if (!$this->isIntercepting()) {
235
            $this->process();
236
        }
237
238
        return $envelope;
239
    }
240
241
    /**
242
     * Resets all the data for this transport.
243
     */
244
    public function reset(): void
245
    {
246
        self::$queue[$this->name] = self::$dispatched[$this->name] = self::$acknowledged[$this->name] = self::$rejected[$this->name] = [];
247
    }
248
249
    /**
250
     * Resets data and options for all transports.
251
     */
252
    public static function resetAll(): void
253
    {
254
        self::$queue = self::$dispatched = self::$acknowledged = self::$rejected = self::$intercept = self::$catchExceptions = [];
255
    }
256
257
    private function isIntercepting(): bool
258
    {
259
        return self::$intercept[$this->name];
260
    }
261
262
    private function isCatchingExceptions(): bool
263
    {
264
        return self::$catchExceptions[$this->name];
265
    }
266
267
    private function hasMessagesToProcess(): bool
268
    {
269
        return !empty(self::$queue[$this->name] ?? []);
270
    }
271
}
272