Issues (19)

src/RabbitMQPublisher.php (1 issue)

Labels
Severity
1
<?php
2
3
namespace Kunnu\RabbitMQ;
4
5
use PhpAmqpLib\Wire\AMQPTable;
6
use Illuminate\Support\Collection;
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class RabbitMQPublisher
11
{
12
    /**
13
     * Maximum batch size.
14
     *
15
     * @var int
16
     */
17
    protected int $maxBatchSize = 200;
18
19
    /**
20
     * RabbitMQ Manager.
21
     *
22
     * @var RabbitMQManager
23
     */
24
    protected RabbitMQManager $manager;
25
26
    /**
27
     * Create a new RabbitMQ Publisher instance.
28
     *
29
     * @param  RabbitMQManager  $manager
30
     */
31
    public function __construct(RabbitMQManager $manager)
32
    {
33
        $this->manager = $manager;
34
    }
35
36
    /**
37
     * Set the max batch size.
38
     *
39
     * @param  int  $size
40
     * @return self
41
     */
42
    public function setMaxBatchSize(int $size): self
43
    {
44
        $this->maxBatchSize = $size;
45
46
        return $this;
47
    }
48
49
    /**
50
     * Publish message(s).
51
     *
52
     * @param  RabbitMQMessage[]|RabbitMQMessage  $messages
53
     * @param  string  $routingKey
54
     * @param  string  $connectionName
55
     * @param  PublishConfig  $config
56
     * @return void
57
     */
58
    public function publish(
59
        $messages,
60
        string $routingKey = '',
61
        string $connectionName = null,
62
        PublishConfig $publishConfig = null
63
    ): void {
64
        $messages = !is_array($messages) ? [$messages] : $messages;
65
        $publishConfig = $publishConfig ?? new PublishConfig();
66
67
        $defaultConfig = new Collection($this->manager->getConfig()->get(RabbitMQManager::CONFIG_KEY . '.defaults'));
68
69
        $connectionName = $connectionName ?? $this->manager->resolveDefaultConfigName();
70
        $connection = $this->manager->resolveConnection();
71
72
        $channelId = $this->manager->resolveChannelId($publishConfig->get('channel_id'), $connectionName);
73
        $channel = $this->manager->resolveChannel($connectionName, $channelId, $connection);
74
75
        $connectionConfig = $this->manager->resolveConfig($connectionName);
76
77
        // Merge/Override default connection configuration with
78
        // the configuration specified for this publishing.
79
        if ($publishConfig && $publishConfig->getConnectionConfig()) {
80
            // Publish config > Connection config
81
            $connectionConfig = $connectionConfig->merge($publishConfig->getConnectionConfig());
82
        }
83
84
        $readyMessages = [];
85
86
        foreach ($messages as $message) {
87
            // Merge message configuration
88
            // Message config > Publish config > Connection config > Default config
89
            $messageConfig = array_merge(
90
                $defaultConfig->get('message', []), // Default config
91
                $connectionConfig->get('message', []), // Connection config
92
                $publishConfig->get('message', []), // Publish config
93
                $message->getConfig()->toArray(), // Message config
94
            );
95
            // Override the message config
96
            $message->setConfig($messageConfig);
97
98
            // Merge the exchange properties
99
            // Publish config > Connection config > Default config
100
            $exchangeProperties = array_merge(
101
                $defaultConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Default properties
102
                $connectionConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Connection properties
103
                $publishConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Publish properties
104
            );
105
106
            // Merge the exchange config
107
            // Exchange config > Publish config > Connection config > Default config
108
            $exchangeConfig = array_merge(
109
                $defaultConfig->get('exchange', []), // Default config
110
                $connectionConfig->get('exchange', []), // Connection config
111
                $publishConfig->get('exchange', ['properties' => $exchangeProperties]), // Publish config,
112
                $message->getExchange() ? $message->getExchange()->getConfig()->toArray() : [], // Exchange config
113
            );
114
115
            // Merge message exchange configuration
116
            if ($message->getExchange()) {
117
                $message->getExchange()->setConfig($exchangeConfig);
118
                $message->getExchange()->getConfig()->put('name', $message->getExchange()->getName());
119
            } else {
120
                $message->setExchange(new RabbitMQExchange($exchangeConfig['name'] ?? '', $exchangeConfig));
121
            }
122
123
            $readyMessages[] = $message;
124
        }
125
126
        $this->publishBulk($readyMessages, $channel, $routingKey);
127
    }
128
129
    /**
130
     * @param  RabbitMQMessage[]  $messages
131
     * @param  AMQPChannel  $channel
132
     * @param  string  $routingKey
133
     *
134
     * @throws RabbitMQException
135
     */
136
    protected function publishBulk(array $messages, AMQPChannel $channel, string $routingKey = ''): void
137
    {
138
        if (count($messages) === 0) {
139
            throw new RabbitMQException('No messages to publish to the exchange.');
140
        }
141
142
        /**
143
         * @var RabbitMQExchange[]
144
         */
145
        $uniqueExchanges = (new Collection($messages))
0 ignored issues
show
$messages of type Kunnu\RabbitMQ\RabbitMQMessage[] is incompatible with the type Illuminate\Contracts\Support\Arrayable expected by parameter $items of Illuminate\Support\Collection::__construct(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

145
        $uniqueExchanges = (new Collection(/** @scrutinizer ignore-type */ $messages))
Loading history...
146
            ->unique(function (RabbitMQMessage $message) {
147
                return $message->getExchange()->getName();
148
            })->map(function (RabbitMQMessage $message) {
149
                return $message->getExchange();
150
            });
151
152
        $uniqueExchanges->each(function (RabbitMQExchange $exchange) use ($channel) {
153
            $exchangeConfig = $exchange->getConfig();
154
155
            if ($exchangeConfig->get('declare')) {
156
                $channel->exchange_declare(
157
                    $exchange->getName(),
158
                    $exchangeConfig->get('type'),
159
                    $exchangeConfig->get('passive', false),
160
                    $exchangeConfig->get('durable', true),
161
                    $exchangeConfig->get('auto_delete', false),
162
                    $exchangeConfig->get('internal', false),
163
                    $exchangeConfig->get('nowait', false),
164
                    (new AMQPTable($exchangeConfig->get('properties', [])))->getNativeData()
165
                );
166
            }
167
        });
168
169
        $max = $this->maxBatchSize;
170
171
        foreach ($messages as $message) {
172
            // Queue message for batch publish
173
            $channel->batch_basic_publish(
174
                new AMQPMessage($message->getStream(), $message->getConfig()->toArray()),
175
                $message->getExchange()->getName(),
176
                $routingKey,
177
            );
178
179
            $batchReadyToBePublished = --$max <= 0;
180
181
            if ($batchReadyToBePublished) {
182
                // Publish all the messages in the batch
183
                $channel->publish_batch();
184
                // Reset batch counter
185
                $max = $this->maxBatchSize;
186
            }
187
        }
188
189
        // Publish all the remaining batches
190
        $channel->publish_batch();
191
    }
192
}
193