Publisher::attemptToPublish()   A
last analyzed

Complexity

Conditions 3
Paths 4

Size

Total Lines 29
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 19
c 1
b 0
f 0
dl 0
loc 29
ccs 0
cts 22
cp 0
rs 9.6333
cc 3
nc 4
nop 1
crap 12
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Umbrellio\TableSync\Rabbit;
6
7
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
8
use PhpAmqpLib\Exception\AMQPRuntimeException;
9
use Psr\Log\LoggerInterface;
10
use Psr\Log\NullLogger;
11
use Umbrellio\TableSync\Messages\PublishMessage;
12
use Umbrellio\TableSync\Publisher as PublisherContract;
13
use Umbrellio\TableSync\Rabbit\Config\Publisher as Config;
14
use Umbrellio\TableSync\Rabbit\Exceptions\MaxAttemptsExceeded;
15
16
final class Publisher implements PublisherContract
17
{
18
    private $config;
19
    private $connectionContainer;
20
    private $messageBuilder;
21
    private $logger;
22
23
    public function __construct(
24
        Config $config,
25
        ConnectionContainer $connectionContainer,
26
        MessageBuilder $messageBuilder,
27
        LoggerInterface $logger = null
28
    ) {
29
        $this->config = $config;
30
        $this->connectionContainer = $connectionContainer;
31
        $this->messageBuilder = $messageBuilder;
32
        $this->logger = $logger ?? new NullLogger();
33
    }
34
35
    public function publish(PublishMessage $message): void
36
    {
37
        $this->tryPublish($message);
38
    }
39
40
    private function tryPublish(PublishMessage $message, int $attempt = 1): void
41
    {
42
        try {
43
            $this->attemptToPublish($message);
44
        } catch (AMQPRuntimeException | AMQPConnectionClosedException $exception) {
45
            $this->connectionContainer->reconnect();
46
47
            if ($attempt === $this->config->attempts()) {
48
                throw new MaxAttemptsExceeded("Publisher tried {$attempt} times.");
49
            }
50
51
            $this->tryPublish($message, ++$attempt);
52
        }
53
    }
54
55
    private function attemptToPublish(PublishMessage $message): void
56
    {
57
        $channel = $this->connectionContainer->connection()
58
            ->channel();
59
60
        $confirmSelect = $this->config->confirmSelect();
61
        if ($confirmSelect) {
62
            $channel->confirm_select();
63
        }
64
65
        $amqpMessage = $this->messageBuilder->buildForPublishing($message);
66
67
        $channel->basic_publish(
68
            $amqpMessage,
69
            $this->config->exchangeName(),
70
            $message->routingKey(),
71
            $confirmSelect
72
        );
73
74
        $this->logger->info('Message publishing', [
75
            'direction' => 'publish',
76
            'body' => $amqpMessage->getBody(),
77
            'props' => $amqpMessage->get_properties(),
78
            'exchange' => $this->config->exchangeName(),
79
            'routing_key' => $message->routingKey(),
80
        ]);
81
82
        if ($confirmSelect) {
83
            $channel->wait_for_pending_acks_returns();
84
        }
85
    }
86
}
87