Publisher::publishBatch()   F
last analyzed

Complexity

Conditions 18
Paths 210

Size

Total Lines 77
Code Lines 51

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 36
CRAP Score 18.0471

Importance

Changes 5
Bugs 0 Features 0
Metric Value
eloc 51
c 5
b 0
f 0
dl 0
loc 77
ccs 36
cts 38
cp 0.9474
rs 3.9083
cc 18
nc 210
nop 4
crap 18.0471

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * @author Marwan Al-Soltany <[email protected]>
5
 * @copyright Marwan Al-Soltany 2020
6
 * For the full copyright and license information, please view
7
 * the LICENSE file that was distributed with this source code.
8
 */
9
10
declare(strict_types=1);
11
12
namespace MAKS\AmqpAgent\Worker;
13
14
use PhpAmqpLib\Channel\AMQPChannel;
15
use PhpAmqpLib\Message\AMQPMessage;
16
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
17
use PhpAmqpLib\Exception\AMQPTimeoutException;
18
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
19
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
20
use PhpAmqpLib\Exception\AMQPChannelClosedException;
21
use MAKS\AmqpAgent\Worker\AbstractWorker;
22
use MAKS\AmqpAgent\Worker\PublisherInterface;
23
use MAKS\AmqpAgent\Worker\WorkerFacilitationInterface;
24
use MAKS\AmqpAgent\Exception\AmqpAgentException as Exception;
25
use MAKS\AmqpAgent\Config\PublisherParameters as Parameters;
26
27
/**
28
 * A class specialized in publishing. Implementing only the methods needed for a publisher.
29
 *
30
 * Example:
31
 * ```
32
 * $publisher = new Publisher();
33
 * $publisher->connect();
34
 * $publisher->queue();
35
 * $publisher->exchange();
36
 * $publisher->bind();
37
 * $publisher->publish('Some message!');
38
 * $publisher->disconnect();
39
 * ```
40
 *
41
 * @since 1.0.0
42
 * @api
43
 */
44
class Publisher extends AbstractWorker implements PublisherInterface, WorkerFacilitationInterface
45
{
46
    /**
47
     * The default exchange options that the worker should use when no overrides are provided.
48
     * @var array
49
     */
50
    protected $exchangeOptions;
51
52
    /**
53
     * The default bind options that the worker should use when no overrides are provided.
54
     * @var array
55
     */
56
    protected $bindOptions;
57
58
    /**
59
     * The default message options that the worker should use when no overrides are provided.
60
     * @var array
61
     */
62
    protected $messageOptions;
63
64
    /**
65
     * The default publish options that the worker should use when no overrides are provided.
66
     * @var array
67
     */
68
    protected $publishOptions;
69
70
71
    /**
72
     * Publisher object constructor.
73
     * @param array $connectionOptions [optional] The overrides for the default connection options of the worker.
74
     * @param array $channelOptions [optional] The overrides for the default channel options of the worker.
75
     * @param array $queueOptions [optional] The overrides for the default queue options of the worker.
76
     * @param array $exchangeOptions [optional] The overrides for the default exchange options of the worker.
77
     * @param array $bindOptions [optional] The overrides for the default bind options of the worker.
78
     * @param array $messageOptions [optional] The overrides for the default message options of the worker.
79
     * @param array $publishOptions [optional] The overrides for the default publish options of the worker.
80
     */
81 13
    public function __construct(
82
        array $connectionOptions = [],
83
        array $channelOptions = [],
84
        array $queueOptions = [],
85
        array $exchangeOptions = [],
86
        array $bindOptions = [],
87
        array $messageOptions = [],
88
        array $publishOptions = []
89
    ) {
90 13
        $this->exchangeOptions = Parameters::patch($exchangeOptions, 'EXCHANGE_OPTIONS');
91 13
        $this->bindOptions     = Parameters::patch($bindOptions, 'BIND_OPTIONS');
92 13
        $this->messageOptions  = Parameters::patch($messageOptions, 'MESSAGE_OPTIONS');
93 13
        $this->publishOptions  = Parameters::patch($publishOptions, 'PUBLISH_OPTIONS');
94
95 13
        parent::__construct($connectionOptions, $channelOptions, $queueOptions);
96 13
    }
97
98
99
    /**
100
     * Declares an exchange on the default channel of the worker's connection to RabbitMQ server.
101
     * @param array|null $parameters [optional] The overrides for the default exchange options of the worker.
102
     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.
103
     * @return self
104
     * @throws AMQPTimeoutException
105
     */
106 8
    public function exchange(?array $parameters = null, ?AMQPChannel $_channel = null)
107
    {
108 8
        $changes = null;
109 8
        if ($parameters) {
110 4
            $changes = $this->mutateClassMember('exchangeOptions', $parameters);
111
        }
112
113 8
        $channel = $_channel ?: $this->channel;
114
115
        try {
116 8
            $channel->exchange_declare(
117 8
                $this->exchangeOptions['exchange'],
118 8
                $this->exchangeOptions['type'],
119 8
                $this->exchangeOptions['passive'],
120 8
                $this->exchangeOptions['durable'],
121 8
                $this->exchangeOptions['auto_delete'],
122 8
                $this->exchangeOptions['internal'],
123 8
                $this->exchangeOptions['nowait'],
124 8
                $this->exchangeOptions['arguments'],
125 8
                $this->exchangeOptions['ticket']
126
            );
127
        } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore
128
            Exception::rethrow($error); // @codeCoverageIgnore
129
        }
130
131 8
        if ($changes) {
132 4
            $this->mutateClassMember('exchangeOptions', $changes);
133
        }
134
135 8
        return $this;
136
    }
137
138
    /**
139
     * Binds the default queue to the default exchange on the default channel of the worker's connection to RabbitMQ server.
140
     * @param array|null $parameters [optional] The overrides for the default bind options of the worker.
141
     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.
142
     * @return self
143
     * @throws AMQPTimeoutException
144
     */
145 7
    public function bind(?array $parameters = null, ?AMQPChannel $_channel = null)
146
    {
147 7
        $changes = null;
148 7
        if ($parameters) {
149 4
            $changes = $this->mutateClassMember('bindOptions', $parameters);
150
        }
151
152 7
        $channel = $_channel ?: $this->channel;
153
154
        try {
155 7
            $channel->queue_bind(
156 7
                $this->bindOptions['queue'],
157 7
                $this->bindOptions['exchange'],
158 7
                $this->bindOptions['routing_key'],
159 7
                $this->bindOptions['nowait'],
160 7
                $this->bindOptions['arguments'],
161 7
                $this->bindOptions['ticket']
162
            );
163
        } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore
164
            Exception::rethrow($error); // @codeCoverageIgnore
165
        }
166
167 7
        if ($changes) {
168 4
            $this->mutateClassMember('bindOptions', $changes);
169
        }
170
171 7
        return $this;
172
    }
173
174
    /**
175
     * Returns an AMQPMessage object.
176
     * @param string $body The body of the message.
177
     * @param array|null $properties [optional] The overrides for the default properties of the default message options of the worker.
178
     * @return AMQPMessage
179
     */
180 5
    public function message(string $body, ?array $properties = null): AMQPMessage
181
    {
182 5
        $changes = null;
183 5
        if ($properties) {
184 4
            $changes = $this->mutateClassSubMember('messageOptions', 'properties', $properties);
185
        }
186
187 5
        if ($body) {
188 5
            $this->messageOptions['body'] = $body;
189
        }
190
191 5
        $message = new AMQPMessage(
192 5
            $this->messageOptions['body'],
193 5
            $this->messageOptions['properties']
194
        );
195
196 5
        if ($changes) {
197 4
            $this->mutateClassSubMember('messageOptions', 'properties', $changes);
198
        }
199
200 5
        return $message;
201
    }
202
203
    /**
204
     * Publishes a message to the default exchange on the default channel of the worker's connection to RabbitMQ server.
205
     * @param string|array|AMQPMessage $payload A string of the body of the message or an array of body and properties for the message or a AMQPMessage object.
206
     * @param array|null $parameters [optional] The overrides for the default publish options of the worker.
207
     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.
208
     * @return self
209
     * @throws Exception|AMQPChannelClosedException|AMQPConnectionClosedException|AMQPConnectionBlockedException
210
     */
211 5
    public function publish($payload, ?array $parameters = null, ?AMQPChannel $_channel = null)
212
    {
213 5
        $changes = null;
214 5
        if ($parameters) {
215 3
            $changes = $this->mutateClassMember('publishOptions', $parameters);
216
        }
217
218 5
        $channel = $_channel ?: $this->channel;
219
220 5
        $originalMessage = $this->publishOptions['msg'];
221
222 5
        $message = $payload ?: $originalMessage;
223
224 5
        if ($message instanceof AMQPMessage) {
225 1
            $this->publishOptions['msg'] = $message;
226 5
        } elseif (is_array($message) && isset($message['body']) && isset($message['properties'])) {
227 3
            $this->publishOptions['msg'] = $this->message($message['body'], $message['properties']);
228 4
        } elseif (is_string($message)) {
229 2
            $this->publishOptions['msg'] = $this->message($message);
230
        } else {
231 2
            throw new Exception(
232 2
                sprintf(
233 2
                    'Payload must be a string, an array like %s, or an instance of "%s". The given parameter (data-type: %s) was none of them.',
234 2
                    '["body" => "Message body!", "properties" ["key" => "value"]]',
235 2
                    AMQPMessage::class,
236 2
                    is_object($payload) ? get_class($payload) : gettype($payload)
237
                )
238
            );
239
        }
240
241
        try {
242 3
            $channel->basic_publish(
243 3
                $this->publishOptions['msg'],
244 3
                $this->publishOptions['exchange'],
245 3
                $this->publishOptions['routing_key'],
246 3
                $this->publishOptions['mandatory'],
247 3
                $this->publishOptions['immediate'],
248 3
                $this->publishOptions['ticket']
249
            );
250
        } catch (AMQPChannelClosedException | AMQPConnectionClosedException | AMQPConnectionBlockedException $error) { // @codeCoverageIgnore
251
            Exception::rethrow($error); // @codeCoverageIgnore
252 3
        } finally {
253
            // reverting messageOptions back to its state.
254 3
            $this->publishOptions['msg'] = $originalMessage;
255
        }
256
257 3
        if ($changes) {
258 2
            $this->mutateClassMember('publishOptions', $changes);
259
        }
260
261 3
        return $this;
262
    }
263
264
    /**
265
     * Publishes a batch of messages to the default exchange on the default channel of the worker's connection to RabbitMQ server.
266
     * @param string[]|array[]|AMQPMessage[] $messages An array of bodies of the messages or an array of arrays of body and properties for the messages or an array of AMQPMessage objects.
267
     * @param int $batchSize [optional] The number of messages that should be published per batch.
268
     * @param array|null $parameters [optional] The overrides for the default publish options of the worker.
269
     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.
270
     * @return self
271
     * @throws Exception|AMQPChannelClosedException|AMQPConnectionClosedException|AMQPConnectionBlockedException
272
     */
273 2
    public function publishBatch(array $messages, int $batchSize = 2500, ?array $parameters = null, ?AMQPChannel $_channel = null)
274
    {
275 2
        $changes = null;
276 2
        if ($parameters) {
277 1
            $changes = $this->mutateClassMember('publishOptions', $parameters);
278
        }
279
280 2
        $channel = $_channel ?: $this->channel;
281
282 2
        $originalMessage = $this->publishOptions['msg'];
283
284 2
        $count = count($messages);
285 2
        for ($i = 0; $i < $count; $i++) {
286 2
            $payload = $messages[$i];
287
288 2
            $message = $payload ?: $originalMessage;
289
290 2
            if ($message instanceof AMQPMessage) {
291 2
                $this->publishOptions['msg'] = $message;
292 1
            } elseif (is_array($message) && isset($message['body']) && isset($message['properties'])) {
293
                $this->publishOptions['msg'] = $this->message($message['body'], $message['properties']);
294 1
            } elseif (is_string($message)) {
295
                $this->publishOptions['msg'] = $this->message($message);
296
            } else {
297 1
                throw new Exception(
298 1
                    sprintf(
299 1
                        'Messages array elements must be either a string, an array like %s, or an instance of "%s". Element in index "%d" (data-type: %s) was none of them.',
300 1
                        '["body" => "Message body!", "properties" ["key" => "value"]]',
301 1
                        AMQPMessage::class,
302 1
                        $i,
303 1
                        is_object($payload) ? get_class($payload) : gettype($payload)
304
                    )
305
                );
306
            }
307
308 2
            $channel->batch_basic_publish(
309 2
                $this->publishOptions['msg'],
310 2
                $this->publishOptions['exchange'],
311 2
                $this->publishOptions['routing_key'],
312 2
                $this->publishOptions['mandatory'],
313 2
                $this->publishOptions['immediate'],
314 2
                $this->publishOptions['ticket']
315
            );
316
317 2
            if ($i % $batchSize == 0) {
318
                try {
319 2
                    $channel->publish_batch();
320
                    // @codeCoverageIgnoreStart
321
                } catch (AMQPConnectionBlockedException $e) {
322
                    $tries = -1;
323
                    do {
324
                        sleep(1);
325
                        $tries++;
326
                    } while ($this->connection->isBlocked() && $tries >= 60);
327
328
                    $channel->publish_batch();
329
                } catch (AMQPChannelClosedException | AMQPConnectionClosedException | AMQPConnectionBlockedException $error) {
330
                    Exception::rethrow($error);
331
                    // @codeCoverageIgnoreEnd
332
                }
333
            }
334
        }
335
336
        try {
337 1
            $channel->publish_batch();
338
        } catch (AMQPChannelClosedException | AMQPConnectionClosedException | AMQPConnectionBlockedException $error) { // @codeCoverageIgnore
339
            Exception::rethrow($error); // @codeCoverageIgnore
340 1
        } finally {
341
            // reverting messageOptions back to its state.
342 1
            $this->publishOptions['msg'] = $originalMessage;
343
        }
344
345 1
        if ($changes) {
346 1
            $this->mutateClassMember('publishOptions', $changes);
347
        }
348
349 1
        return $this;
350
    }
351
352
    /**
353
     * Executes `self::connect()`, `self::queue()`, `self::exchange`, and `self::bind()` respectively.
354
     * @return self
355
     */
356 3
    public function prepare()
357
    {
358 3
        $this->connect();
359 3
        $this->queue();
360 3
        $this->exchange();
361 3
        $this->bind();
362
363 3
        return $this;
364
    }
365
366
    /**
367
     * Executes `self::connect()`, `self::queue()`, `self::exchange`, `self::bind()`, `self::publish()`, and `self::disconnect()` respectively.
368
     * @param string[]|array[]|AMQPMessage[] $messages An array of strings, arrays, or AMQPMessage objects (same as `self::publishBatch()`).
369
     * @return void
370
     * @throws Exception
371
     */
372 2
    public function work($messages): void
373
    {
374
        try {
375 2
            $this->prepare();
376 2
            foreach ($messages as $message) {
377 2
                $this->publish($message);
378
            }
379 1
            $this->disconnect();
380 1
        } catch (Exception $error) {
381 1
            Exception::rethrow($error, null, false);
382
        }
383 1
    }
384
}
385