RabbitMq3Transport   A
last analyzed

Complexity

Total Complexity 5

Size/Duplication

Total Lines 45
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 9
Bugs 1 Features 0
Metric Value
eloc 24
c 9
b 1
f 0
dl 0
loc 45
ccs 0
cts 20
cp 0
rs 10
wmc 5

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A send() 0 23 3
A getKey() 0 3 1
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/rabbitmq3-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\RabbitMq3\Transport;
10
11
use Daikon\Interop\Assertion;
12
use Daikon\MessageBus\Channel\Subscription\Transport\TransportInterface;
13
use Daikon\MessageBus\EnvelopeInterface;
14
use Daikon\MessageBus\MessageBusInterface;
15
use Daikon\RabbitMq3\Connector\RabbitMq3Connector;
16
use PhpAmqpLib\Channel\AMQPChannel;
17
use PhpAmqpLib\Message\AMQPMessage;
18
use PhpAmqpLib\Wire\AMQPTable;
19
20
final class RabbitMq3Transport implements TransportInterface
21
{
22
    public const EXCHANGE = 'exchange';
23
    public const ROUTING_KEY = 'routing_key';
24
    public const APPLICATION_HEADERS = 'application_headers';
25
    public const EXPIRATION = 'expiration';
26
27
    private string $key;
28
29
    private RabbitMq3Connector $connector;
30
31
    public function __construct(string $key, RabbitMq3Connector $connector)
32
    {
33
        $this->key = $key;
34
        $this->connector = $connector;
35
    }
36
37
    public function send(EnvelopeInterface $envelope, MessageBusInterface $messageBus): void
38
    {
39
        $metadata = $envelope->getMetadata();
40
        $exchange = $metadata->get(self::EXCHANGE);
41
        $routingKey = $metadata->get(self::ROUTING_KEY, '');
42
43
        Assertion::notBlank($exchange, 'Exchange name must not be blank.');
44
        Assertion::string($routingKey, 'Routing key must be a string.');
45
46
        $payload = json_encode($envelope->toNative());
47
        $properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
48
        if ($metadata->has(self::APPLICATION_HEADERS)) {
49
            $properties['application_headers'] = new AMQPTable($metadata->get(self::APPLICATION_HEADERS));
50
        }
51
        if ($metadata->has(self::EXPIRATION)) {
52
            $properties['expiration'] = $metadata->get(self::EXPIRATION);
53
        }
54
55
        $amqpMessage = new AMQPMessage($payload, $properties);
56
57
        /** @var AMQPChannel $channel */
58
        $channel = $this->connector->getConnection()->channel();
59
        $channel->basic_publish($amqpMessage, $exchange, $routingKey);
60
    }
61
62
    public function getKey(): string
63
    {
64
        return $this->key;
65
    }
66
}
67