Test Failed
Pull Request — master (#39)
by Aleksandr
05:36
created

Producer::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nc 1
nop 3
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Producer;
4
5
use OldSound\RabbitMqBundle\Declarations\DeclarationsRegistry;
6
use OldSound\RabbitMqBundle\Declarations\Declarator;
7
use OldSound\RabbitMqBundle\EventDispatcherAwareTrait;
8
use PhpAmqpLib\Channel\AMQPChannel;
9
use PhpAmqpLib\Message\AMQPMessage;
10
use PhpAmqpLib\Wire\AMQPTable;
11
use Psr\Log\LoggerAwareTrait;
12
use Psr\Log\NullLogger;
13
14
/**
15
 * Producer, that publishes AMQP Messages
16
 */
17
class Producer implements ProducerInterface
18
{
19
    use LoggerAwareTrait;
20
21
    /** @var AMQPChannel */
22
    protected $channel;
23
    /** @var string */
24
    protected $exchange;
25
    /** @var bool */
26
    protected $autoDeclare;
27
28
    /** @var string */
29
    protected $contentType = 'text/plain';
30
    /** @var int */
31
    protected $deliveryMode = self::DELIVERY_MODE_PERSISTENT;
32
33
    public function __construct(AMQPChannel $channel, string $exchange, bool $autoDeclare = false)
34
    {
35
        $this->channel = $channel;
36
        $this->exchange = $exchange;
37
        $this->logger = new NullLogger();
38
        $this->autoDeclare = $autoDeclare;
39
    }
40
41
    public function setContentType(string $contentType): Producer
42
    {
43
        $this->contentType = $contentType;
44
        return $this;
45
    }
46
47
    public function setDeliveryMode(int $deliveryMode): Producer
48
    {
49
        $this->deliveryMode = $deliveryMode;
50
        return $this;
51
    }
52
53
    /**
54
     * Publishes the message and merges additional properties with basic properties
55
     *
56
     * @param string $msgBody
57
     * @param string $routingKey
58
     * @param array $additionalProperties
59
     * @param array $headers
60
     */
61
    public function publish($msgBody, string $routingKey = '', array $additionalProperties = [], array $headers = null)
62
    {
63
        if ($this->autoDeclare) {
64
            // TODO (new Declarator($this->channel))->declareForExchange($this->exchange);
65
        }
66
67
        $msg = new AMQPMessage((string) $msgBody, array_merge([
68
            'content_type' => $this->contentType,
69
            'delivery_mode' => $this->deliveryMode
70
        ], $additionalProperties));
71
72
        if (!empty($headers)) {
73
            $headersTable = new AMQPTable($headers);
74
            $msg->set('application_headers', $headersTable);
75
        }
76
77
        $this->channel->basic_publish($msg, $this->exchange, $routingKey);
78
        $this->logger->debug('AMQP message published', [
79
            'amqp' => [
80
                'body' => $msgBody,
81
                'routingkeys' => $routingKey,
82
                'properties' => $additionalProperties,
83
                'headers' => $headers
84
            ]
85
        ]);
86
    }
87
}
88