Completed
Push — master ( 53783e...ee648a )
by Olivier
22s queued 20s
created

MessagePublisher/PhpAmqpLibMessagePublisher.php (1 issue)

Labels
Severity
1
<?php
2
3
namespace Swarrot\Broker\MessagePublisher;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Message\AMQPMessage;
7
use Swarrot\Broker\Message;
8
9
class PhpAmqpLibMessagePublisher implements MessagePublisherInterface
10
{
11
    /** @var AMQPChannel $channel */
12
    private $channel;
13
14
    /** @var string $exchange Exchange's name. Required by php-amqplib */
15
    private $exchange;
16
17
    private $timeout;
18
19
    private $publisherConfirms;
20
21
    public function __construct(
22
        AMQPChannel $channel,
23
        string $exchange,
24
        bool $publisherConfirms = false,
25
        int $timeout = 0
26
    ) {
27
        $this->channel = $channel;
28
        $this->exchange = $exchange;
29
        $this->publisherConfirms = $publisherConfirms;
30
        if ($publisherConfirms) {
31
            if (!method_exists($this->channel, 'set_nack_handler')) {
32
                throw new \Exception('Publisher confirms are not supported. Update your php amqplib package to >=2.2');
33
            }
34
            $this->channel->set_nack_handler($this->getNackHandler());
35
            $this->channel->confirm_select();
36
        }
37
        $this->timeout = $timeout;
38
    }
39
40
    /**
41
     * {@inheritdoc}
42
     */
43
    public function publish(Message $message, string $key = null): void
44
    {
45
        $properties = $message->getProperties();
46
        if (isset($properties['headers'])) {
47
            if (!isset($properties['application_headers'])) {
48
                $properties['application_headers'] = [];
49
            }
50
            foreach ($properties['headers'] as $header => $value) {
51
                if (\is_array($value)) {
52
                    $type = 'A';
53
                } elseif (\is_int($value)) {
54
                    $type = 'I';
55
                } else {
56
                    $type = 'S';
57
                }
58
59
                $properties['application_headers'][$header] = [$type, $value];
60
            }
61
        }
62
63
        $amqpMessage = new AMQPMessage($message->getBody() ?? '', $properties);
64
65
        $this->channel->basic_publish($amqpMessage, $this->exchange, (string) $key);
66
        if ($this->publisherConfirms) {
67
            $this->channel->wait_for_pending_acks($this->timeout);
68
        }
69
    }
70
71
    /**
72
     * {@inheritdoc}
73
     */
74
    public function getExchangeName(): string
75
    {
76
        return $this->exchange;
77
    }
78
79
    private function getNackHandler(): callable
80
    {
81
        return function (AMQPMessage $message) {
82
            if ($message->has('delivery_tag') && is_scalar($message->get('delivery_tag'))) {
83
                throw new \Exception('Error publishing deliveryTag: '.$message->get('delivery_tag'));
1 ignored issue
show
Are you sure $message->get('delivery_tag') of type PhpAmqpLib\Channel\AMQPChannel|mixed can be used in concatenation? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

83
                throw new \Exception('Error publishing deliveryTag: './** @scrutinizer ignore-type */ $message->get('delivery_tag'));
Loading history...
84
            } else {
85
                throw new \Exception('Error publishing message: '.$message->getBody());
86
            }
87
        };
88
    }
89
}
90