Producer   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 74
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 9
Bugs 0 Features 0
Metric Value
eloc 32
dl 0
loc 74
rs 10
c 9
b 0
f 0
ccs 0
cts 26
cp 0
wmc 8

5 Methods

Rating   Name   Duplication   Size   Complexity  
A setDeliveryMode() 0 5 1
A getBasicProperties() 0 3 1
A setDefaultRoutingKey() 0 5 1
A publish() 0 33 4
A setContentType() 0 5 1
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AfterProducerPublishMessageEvent;
6
use OldSound\RabbitMqBundle\Event\BeforeProducerPublishMessageEvent;
7
use PhpAmqpLib\Message\AMQPMessage;
8
use PhpAmqpLib\Wire\AMQPTable;
9
10
/**
11
 * Producer, that publishes AMQP Messages
12
 */
13
class Producer extends BaseAmqp implements ProducerInterface
14
{
15
    public const DEFAULT_CONTENT_TYPE = 'text/plain';
16
    protected $contentType = Producer::DEFAULT_CONTENT_TYPE;
17
    protected $deliveryMode = 2;
18
    protected $defaultRoutingKey = '';
19
20
    public function setContentType($contentType)
21
    {
22
        $this->contentType = $contentType;
23
24
        return $this;
25
    }
26
27
    public function setDeliveryMode($deliveryMode)
28
    {
29
        $this->deliveryMode = $deliveryMode;
30
31
        return $this;
32
    }
33
34
    public function setDefaultRoutingKey($defaultRoutingKey)
35
    {
36
        $this->defaultRoutingKey = $defaultRoutingKey;
37
38
        return $this;
39
    }
40
41
    protected function getBasicProperties()
42
    {
43
        return ['content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode];
44
    }
45
46
    /**
47
     * Publishes the message and merges additional properties with basic properties
48
     *
49
     * @param string $msgBody
50
     * @param string $routingKey
51
     * @param array $additionalProperties
52
     * @param array $headers
53
     */
54
    public function publish($msgBody, $routingKey = null, $additionalProperties = [], ?array $headers = null)
55
    {
56
        if ($this->autoSetupFabric) {
57
            $this->setupFabric();
58
        }
59
60
        $msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties));
61
62
        if (!empty($headers)) {
63
            $headersTable = new AMQPTable($headers);
64
            $msg->set('application_headers', $headersTable);
65
        }
66
67
        $real_routingKey = $routingKey !== null ? $routingKey : $this->defaultRoutingKey;
68
69
        $this->dispatchEvent(
70
            BeforeProducerPublishMessageEvent::NAME,
71
            new BeforeProducerPublishMessageEvent($this, $msg, $real_routingKey)
72
        );
73
74
        $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$real_routingKey);
75
        $this->logger->debug('AMQP message published', [
76
            'amqp' => [
77
                'body' => $msgBody,
78
                'routingkey' => $real_routingKey,
79
                'properties' => $additionalProperties,
80
                'headers' => $headers,
81
            ],
82
        ]);
83
84
        $this->dispatchEvent(
85
            AfterProducerPublishMessageEvent::NAME,
86
            new AfterProducerPublishMessageEvent($this, $msg, $real_routingKey)
87
        );
88
    }
89
}
90