Passed
Branch master (6c6bd5)
by Sébastien
03:15
created

QueueTransport::ack()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 2
c 1
b 0
f 0
dl 0
loc 5
ccs 0
cts 3
cp 0
rs 10
cc 1
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Bdf\QueueMessengerBundle\Transport;
4
5
use Bdf\Queue\Consumer\Receiver\StopWhenEmptyReceiver;
6
use Bdf\Queue\Consumer\Receiver\TimeLimiterReceiver;
7
use Bdf\Queue\Destination\DestinationInterface;
8
use Bdf\Queue\Message\EnvelopeInterface as QueuedEnvelope;
9
use Bdf\Queue\Message\Message as QueueMessage;
10
use Bdf\Queue\Testing\StackMessagesReceiver;
11
use Bdf\QueueMessengerBundle\Transport\Stamp\BdfQueueReceivedStamp;
12
use Bdf\QueueMessengerBundle\Transport\Stamp\DestinationStamp;
13
use Bdf\QueueMessengerBundle\Transport\Stamp\StampsSerializerInterface;
14
use Symfony\Component\Messenger\Envelope;
15
use Symfony\Component\Messenger\Stamp\DelayStamp;
16
use Symfony\Component\Messenger\Stamp\HandledStamp;
17
use Symfony\Component\Messenger\Transport\TransportInterface;
18
19
/**
20
 *
21
 */
22
class QueueTransport implements TransportInterface
23
{
24
    /**
25
     * The destinations instance
26
     *
27
     * @var DestinationInterface
28
     */
29
    private $destination;
30
31
    /**
32
     * @var StampsSerializerInterface
33
     */
34
    private $stampsSerializer;
35
36
    /**
37
     * @var int
38
     */
39
    private $consumerTimeout;
40
41
42
    /**
43
     * Constructor
44
     *
45
     * @param DestinationInterface $manager
46
     * @param StampsSerializerInterface $stampsSerializer
47
     * @param int $consumerTimeout
48
     */
49 1
    public function __construct(DestinationInterface $destination, StampsSerializerInterface $stampsSerializer, int $consumerTimeout = 1)
50
    {
51 1
        $this->destination = $destination;
52 1
        $this->stampsSerializer = $stampsSerializer;
53 1
        $this->consumerTimeout = $consumerTimeout;
54 1
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59
    public function get(): iterable
60
    {
61
        $stack = new StackMessagesReceiver();
62
        $extension = new StopWhenEmptyReceiver($stack);
63
        $extension = new TimeLimiterReceiver($extension, $this->consumerTimeout);
64
65
        $consumer = $this->destination->consumer($extension);
66
        $consumer->consume($this->consumerTimeout);
67
68
        foreach ($stack->messages() as $envelope) {
69
            yield $this
70
                ->toEnvelope($envelope->message()->data(), $envelope)
71
                ->with(new BdfQueueReceivedStamp($envelope))
72
            ;
73
        }
74
    }
75
76
    /**
77
     * Get the envelope from the message payload
78
     *
79
     * @param mixed $message
80
     * @param QueuedEnvelope $queuedEnvelope
81
     *
82
     * @return Envelope
83
     */
84
    private function toEnvelope($message, QueuedEnvelope $queuedEnvelope): Envelope
85
    {
86
        if ($message instanceof Envelope) {
87
            return $message;
88
        }
89
90
        return new Envelope($message, $this->unserializeStamps($queuedEnvelope));
91
    }
92
93
    /**
94
     * {@inheritdoc}
95
     */
96
    public function ack(Envelope $envelope): void
97
    {
98
        /** @var BdfQueueReceivedStamp $stamp */
99
        $stamp = $envelope->last(BdfQueueReceivedStamp::class);
100
        $stamp->getEnvelope()->acknowledge();
101
    }
102
103
    /**
104
     * {@inheritdoc}
105
     */
106
    public function reject(Envelope $envelope): void
107
    {
108
        /** @var BdfQueueReceivedStamp $stamp */
109
        $stamp = $envelope->last(BdfQueueReceivedStamp::class);
110
        $stamp->getEnvelope()->reject();
111
    }
112
113
    /**
114
     * {@inheritdoc}
115
     */
116
    public function send(Envelope $envelope): Envelope
117
    {
118
        $message = $envelope->getMessage();
119
120
        /** @var DestinationStamp|null $destinationStamp */
121
        $destinationStamp = $envelope->last(DestinationStamp::class) ?: new DestinationStamp();
122
        /** @var DelayStamp|null $delayStamp */
123
        $delayStamp = $envelope->last(DelayStamp::class);
124
125
        $queueMessage = new QueueMessage();
126
        $queueMessage->setData($message);
127
        $queueMessage->setName($destinationStamp->getMessageName() ?: get_class($message));
128
        $queueMessage->setMaxTries($destinationStamp->getMaxTries());
129
        $queueMessage->disableStore($destinationStamp->noStore());
130
        $queueMessage->setNeedsReply($destinationStamp->getNeedsReply());
131
        $queueMessage->setDelay($delayStamp !== null ? $delayStamp->getDelay() : 0);
132
        $queueMessage->setHeaders($destinationStamp->getHeaders());
133
        $queueMessage->addHeader('stamps', $this->serializeStamps($envelope));
134
135
        if ($queueMessage->needsReply()) {
136
            $result = $this->destination->send($queueMessage)->await($destinationStamp->getRpcTimeout());
137
138
            return $envelope->with(new HandledStamp($result, static::class));
139
        }
140
141
        $this->destination->send($queueMessage);
142
143
        return $envelope;
144
    }
145
146
    /**
147
     * Extract stamps from the envelope
148
     * The returned value is a serialization of a single dimension array
149
     */
150
    private function serializeStamps(Envelope $envelope): string
151
    {
152
        $stamps = $envelope->all();
153
154
        if (!empty($stamps)) {
155
            // Convert two dimension array (Envelope::all() return list of stamps, grouping by the stamp class)
156
            // Use array_values to remove the keys (class name) for get a sequential array
157
            $stamps = array_merge(...array_values($stamps));
158
        }
159
160
        return $this->stampsSerializer->serialize($stamps);
161
    }
162
163
    /**
164
     * Extract stamps from the queued envelope
165
     */
166
    private function unserializeStamps(QueuedEnvelope $queuedEnvelope): array
167
    {
168
        if ($stamps = $queuedEnvelope->message()->header('stamps')) {
169
            return $this->stampsSerializer->deserialize($stamps);
170
        }
171
172
        return [];
173
    }
174
}
175
176