Passed
Branch master (74fab5)
by Kunal
03:19
created

RabbitMQPublisher::publishBulk()   A

Complexity

Conditions 5
Paths 4

Size

Total Lines 53
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 5
eloc 30
c 1
b 0
f 1
nc 4
nop 3
dl 0
loc 53
rs 9.1288

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Kunnu\RabbitMQ;
4
5
use PhpAmqpLib\Wire\AMQPTable;
6
use Illuminate\Support\Collection;
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class RabbitMQPublisher
11
{
12
    /**
13
     * Maximum batch size.
14
     *
15
     * @var int
16
     */
17
    protected int $maxBatchSize = 200;
18
19
    /**
20
     * RabbitMQ Manager.
21
     *
22
     * @var RabbitMQManager $manager
23
     */
24
    protected RabbitMQManager $manager;
25
26
    /**
27
     * Create a new RabbitMQ Publisher instance.
28
     *
29
     * @param RabbitMQManager $manager
30
     */
31
    public function __construct(RabbitMQManager $manager)
32
    {
33
        $this->manager = $manager;
34
    }
35
36
    /**
37
     * Set the max batch size.
38
     *
39
     * @param integer $size
40
     * @return self
41
     */
42
    public function setMaxBatchSize(int $size): self
43
    {
44
        $this->maxBatchSize = $size;
45
46
        return $this;
47
    }
48
49
    /**
50
     * Publish message(s).
51
     *
52
     * @param RabbitMQMessage[]|RabbitMQMessage $messages
53
     * @param string $routingKey
54
     * @param string $connectionName
55
     * @param PublishConfig $config
56
     *
57
     * @return void
58
     */
59
    public function publish(
60
        $messages,
61
        string $routingKey = '',
62
        string $connectionName = null,
63
        PublishConfig $publishConfig = null
64
    ): void {
65
        $messages = !is_array($messages) ? [$messages] : $messages;
66
        $publishConfig = $publishConfig ?? new PublishConfig();
67
68
        $defaultConfig = new Collection($this->manager->getConfig()->get(RabbitMQManager::CONFIG_KEY . ".defaults"));
69
70
        $connectionName = $connectionName ?? $this->manager->resolveDefaultConfigName();
71
        $connection = $this->manager->resolveConnection();
72
73
        $channelId = $this->manager->resolveChannelId($publishConfig->get("channel_id"), $connectionName);
74
        $channel = $this->manager->resolveChannel($connectionName, $channelId, $connection);
75
76
        $connectionConfig = $this->manager->resolveConfig($connectionName);
0 ignored issues
show
Bug introduced by
It seems like $connectionName can also be of type null; however, parameter $connectionName of Kunnu\RabbitMQ\RabbitMQManager::resolveConfig() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

76
        $connectionConfig = $this->manager->resolveConfig(/** @scrutinizer ignore-type */ $connectionName);
Loading history...
77
78
        // Merge/Override default connection configuration with
79
        // the configuration specified for this publishing.
80
        if ($publishConfig && $publishConfig->getConnectionConfig()) {
81
            // Publish config > Connection config
82
            $connectionConfig = $connectionConfig->merge($publishConfig->getConnectionConfig());
83
        }
84
85
        $readyMessages = [];
86
87
        foreach ($messages as $message) {
88
            // Merge message configuration
89
            // Message config > Publish config > Connection config > Default config
90
            $messageConfig = array_merge(
91
                $defaultConfig->get('message', []), // Default config
92
                $connectionConfig->get('message', []), // Connection config
93
                $publishConfig->get('message', []), // Publish config
94
                $message->getConfig()->toArray(), // Message config
95
            );
96
            // Override the message config
97
            $message->setConfig($messageConfig);
98
99
            // Merge the exchange properties
100
            // Publish config > Connection config > Default config
101
            $exchangeProperties = array_merge(
102
                $defaultConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Default properties
103
                $connectionConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Connection properties
104
                $publishConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Publish properties
105
            );
106
107
            // Merge the exchange config
108
            // Exchange config > Publish config > Connection config > Default config
109
            $exchangeConfig = array_merge(
110
                $defaultConfig->get('exchange', []), // Default config
111
                $connectionConfig->get('exchange', []), // Connection config
112
                $publishConfig->get('exchange', ['properties' => $exchangeProperties]), // Publish config,
113
                $message->getExchange() ? $message->getExchange()->getConfig()->toArray() : [], // Exchange config
114
            );
115
116
            // Merge message exchange configuration
117
            if ($message->getExchange()) {
118
                $message->getExchange()->setConfig($exchangeConfig);
119
                $message->getExchange()->getConfig()->put('name', $message->getExchange()->getName());
120
            } else {
121
                $message->setExchange(new RabbitMQExchange($exchangeConfig['name'] ?? '', $exchangeConfig));
122
            }
123
124
            $readyMessages[] = $message;
125
        }
126
127
        $this->publishBulk($readyMessages, $channel, $routingKey);
128
    }
129
130
    /**
131
     * @param RabbitMQMessage[] $messages
132
     * @param AMQPChannel $channel
133
     * @param string $routingKey
134
     *
135
     * @throws RabbitMQException
136
     */
137
    protected function publishBulk(array $messages, AMQPChannel $channel, string $routingKey = ''): void
138
    {
139
        if (count($messages) === 0) {
140
            throw new RabbitMQException('No messages to publish to the exchange.');
141
        }
142
143
        /**
144
         * @var RabbitMQExchange[]
145
         */
146
        $uniqueExchanges = (new Collection($messages))
0 ignored issues
show
Unused Code introduced by
The assignment to $uniqueExchanges is dead and can be removed.
Loading history...
147
            ->unique(function (RabbitMQMessage $message) {
148
                return $message->getExchange()->getName();
149
            })->map(function (RabbitMQMessage $message) {
150
                return $message->getExchange();
151
            })->each(function (RabbitMQExchange $exchange) use ($channel) {
152
                $exchangeConfig = $exchange->getConfig();
153
154
                if ($exchangeConfig->get('declare')) {
155
                    $channel->exchange_declare(
156
                        $exchange->getName(),
157
                        $exchangeConfig->get('type'),
158
                        $exchangeConfig->get('passive', false),
159
                        $exchangeConfig->get('durable', true),
160
                        $exchangeConfig->get('auto_delete', false),
161
                        $exchangeConfig->get('internal', false),
162
                        $exchangeConfig->get('nowait', false),
163
                        new AMQPTable($exchangeConfig->get('properties', []))
0 ignored issues
show
Bug introduced by
new PhpAmqpLib\Wire\AMQP...'properties', array())) of type PhpAmqpLib\Wire\AMQPTable is incompatible with the type array expected by parameter $arguments of PhpAmqpLib\Channel\AMQPChannel::exchange_declare(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

163
                        /** @scrutinizer ignore-type */ new AMQPTable($exchangeConfig->get('properties', []))
Loading history...
164
                    );
165
                }
166
            });
167
168
        $max = $this->maxBatchSize;
169
170
        foreach ($messages as $message) {
171
            // Queue message for batch publish
172
            $channel->batch_basic_publish(
173
                new AMQPMessage($message->getStream(), $message->getConfig()->toArray()),
174
                $message->getExchange()->getName(),
175
                $routingKey,
176
            );
177
178
            $batchReadyToBePublished = --$max <= 0;
179
180
            if ($batchReadyToBePublished) {
181
                // Publish all the messages in the batch
182
                $channel->publish_batch();
183
                // Reset batch counter
184
                $max = $this->maxBatchSize;
185
            }
186
        }
187
188
        // Publish all the remaining batches
189
        $channel->publish_batch();
190
    }
191
}
192