QueueTransport::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 2
Bugs 1 Features 1
Metric Value
eloc 3
c 2
b 1
f 1
dl 0
loc 5
ccs 4
cts 4
cp 1
rs 10
cc 1
nc 1
nop 3
crap 1
1
<?php
2
3
namespace Bdf\QueueMessengerBundle\Transport;
4
5
use Bdf\Queue\Consumer\Receiver\StopWhenEmptyReceiver;
6
use Bdf\Queue\Consumer\Receiver\TimeLimiterReceiver;
7
use Bdf\Queue\Destination\DestinationInterface;
8
use Bdf\Queue\Message\EnvelopeInterface as QueuedEnvelope;
9
use Bdf\Queue\Message\Message as QueueMessage;
10
use Bdf\Queue\Testing\StackMessagesReceiver;
11
use Bdf\QueueMessengerBundle\Transport\Stamp\BdfQueueReceivedStamp;
12
use Bdf\QueueMessengerBundle\Transport\Stamp\DestinationStamp;
13
use Bdf\QueueMessengerBundle\Transport\Stamp\StampsSerializerInterface;
14
use Symfony\Component\Messenger\Envelope;
15
use Symfony\Component\Messenger\Stamp\DelayStamp;
16
use Symfony\Component\Messenger\Stamp\HandledStamp;
17
use Symfony\Component\Messenger\Transport\TransportInterface;
18
19
class QueueTransport implements TransportInterface
20
{
21
    /**
22
     * The destinations instance.
23
     *
24
     * @var DestinationInterface
25
     */
26
    private $destination;
27
28
    /**
29
     * @var StampsSerializerInterface
30
     */
31
    private $stampsSerializer;
32
33
    /**
34
     * @var int
35
     */
36
    private $consumerTimeout;
37
38
    /**
39
     * Constructor.
40
     *
41
     * @param DestinationInterface $manager
42
     */
43 2
    public function __construct(DestinationInterface $destination, StampsSerializerInterface $stampsSerializer, int $consumerTimeout = 1)
44
    {
45 2
        $this->destination = $destination;
46 2
        $this->stampsSerializer = $stampsSerializer;
47 2
        $this->consumerTimeout = $consumerTimeout;
48
    }
49
50
    /**
51
     * {@inheritdoc}
52
     */
53
    public function get(): iterable
54
    {
55
        $stack = new StackMessagesReceiver();
56
        $extension = new StopWhenEmptyReceiver($stack);
57
        $extension = new TimeLimiterReceiver($extension, $this->consumerTimeout);
58
59
        $consumer = $this->destination->consumer($extension);
60
        $consumer->consume($this->consumerTimeout);
61
62
        foreach ($stack->messages() as $envelope) {
63
            yield $this
64
                ->toEnvelope($envelope->message()->data(), $envelope)
65
                ->with(new BdfQueueReceivedStamp($envelope))
66
            ;
67
        }
68
    }
69
70
    /**
71
     * Get the envelope from the message payload.
72
     *
73
     * @param mixed $message
74
     */
75
    private function toEnvelope($message, QueuedEnvelope $queuedEnvelope): Envelope
76
    {
77
        if ($message instanceof Envelope) {
78
            return $message;
79
        }
80
81
        return new Envelope($message, $this->unserializeStamps($queuedEnvelope));
82
    }
83
84
    /**
85
     * {@inheritdoc}
86
     */
87
    public function ack(Envelope $envelope): void
88
    {
89
        /** @var BdfQueueReceivedStamp $stamp */
90
        $stamp = $envelope->last(BdfQueueReceivedStamp::class);
91
        $stamp->getEnvelope()->acknowledge();
92
    }
93
94
    /**
95
     * {@inheritdoc}
96
     */
97
    public function reject(Envelope $envelope): void
98
    {
99
        /** @var BdfQueueReceivedStamp $stamp */
100
        $stamp = $envelope->last(BdfQueueReceivedStamp::class);
101
        $stamp->getEnvelope()->reject();
102
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107
    public function send(Envelope $envelope): Envelope
108
    {
109
        $message = $envelope->getMessage();
110
111
        /** @var DestinationStamp|null $destinationStamp */
112
        $destinationStamp = $envelope->last(DestinationStamp::class) ?: new DestinationStamp();
113
        /** @var DelayStamp|null $delayStamp */
114
        $delayStamp = $envelope->last(DelayStamp::class);
115
116
        $queueMessage = new QueueMessage();
117
        $queueMessage->setData($message);
118
        $queueMessage->setName($destinationStamp->getMessageName() ?: get_class($message));
119
        $queueMessage->setMaxTries($destinationStamp->getMaxTries());
120
        $queueMessage->disableStore($destinationStamp->noStore());
121
        $queueMessage->setNeedsReply($destinationStamp->getNeedsReply());
122
        $queueMessage->setDelay(null !== $delayStamp ? $delayStamp->getDelay() : 0);
123
        $queueMessage->setHeaders($destinationStamp->getHeaders());
124
        $queueMessage->addHeader('stamps', $this->serializeStamps($envelope));
125
126
        if ($queueMessage->needsReply()) {
127
            $result = $this->destination->send($queueMessage)->await($destinationStamp->getRpcTimeout());
128
129
            return $envelope->with(new HandledStamp($result, static::class));
130
        }
131
132
        $this->destination->send($queueMessage);
133
134
        return $envelope;
135
    }
136
137
    /**
138
     * Extract stamps from the envelope
139
     * The returned value is a serialization of a single dimension array.
140
     */
141
    private function serializeStamps(Envelope $envelope): string
142
    {
143
        $stamps = $envelope->all();
144
145
        if (!empty($stamps)) {
146
            // Convert two dimension array (Envelope::all() return list of stamps, grouping by the stamp class)
147
            // Use array_values to remove the keys (class name) for get a sequential array
148
            $stamps = array_merge(...array_values($stamps));
149
        }
150
151
        return $this->stampsSerializer->serialize($stamps);
152
    }
153
154
    /**
155
     * Extract stamps from the queued envelope.
156
     */
157
    private function unserializeStamps(QueuedEnvelope $queuedEnvelope): array
158
    {
159
        if ($stamps = $queuedEnvelope->message()->header('stamps')) {
160
            return $this->stampsSerializer->deserialize($stamps);
161
        }
162
163
        return [];
164
    }
165
}
166