Completed
Push — master ( 55d787...d81582 )
by Mihai
15s queued 13s
created

Producer   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 61
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 11
Bugs 0 Features 0
Metric Value
eloc 25
dl 0
loc 61
ccs 0
cts 34
cp 0
rs 10
c 11
b 0
f 0
wmc 8

5 Methods

Rating   Name   Duplication   Size   Complexity  
A setDeliveryMode() 0 5 1
A getBasicProperties() 0 3 1
A setDefaultRoutingKey() 0 5 1
A publish() 0 21 4
A setContentType() 0 5 1
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use PhpAmqpLib\Message\AMQPMessage;
6
use PhpAmqpLib\Wire\AMQPTable;
7
8
/**
9
 * Producer, that publishes AMQP Messages
10
 */
11
class Producer extends BaseAmqp implements ProducerInterface
12
{
13
    protected $contentType = 'text/plain';
14
    protected $deliveryMode = 2;
15
    protected $defaultRoutingKey = '';
16
17
    public function setContentType($contentType)
18
    {
19
        $this->contentType = $contentType;
20
21
        return $this;
22
    }
23
24
    public function setDeliveryMode($deliveryMode)
25
    {
26
        $this->deliveryMode = $deliveryMode;
27
28
        return $this;
29
    }
30
31
    public function setDefaultRoutingKey($defaultRoutingKey)
32
    {
33
        $this->defaultRoutingKey = $defaultRoutingKey;
34
35
        return $this;
36
    }
37
38
    protected function getBasicProperties()
39
    {
40
        return array('content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode);
41
    }
42
43
    /**
44
     * Publishes the message and merges additional properties with basic properties
45
     *
46
     * @param string $msgBody
47
     * @param string $routingKey
48
     * @param array $additionalProperties
49
     * @param array $headers
50
     */
51
    public function publish($msgBody, $routingKey = null, $additionalProperties = array(), array $headers = null)
52
    {
53
        if ($this->autoSetupFabric) {
54
            $this->setupFabric();
55
        }
56
57
        $msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties));
58
59
        if (!empty($headers)) {
60
            $headersTable = new AMQPTable($headers);
61
            $msg->set('application_headers', $headersTable);
62
        }
63
64
        $real_routingKey = $routingKey !== null ? $routingKey : $this->defaultRoutingKey;
65
        $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$real_routingKey);
66
        $this->logger->debug('AMQP message published', array(
67
            'amqp' => array(
68
                'body' => $msgBody,
69
                'routingkeys' => $routingKey,
70
                'properties' => $additionalProperties,
71
                'headers' => $headers
72
            )
73
        ));
74
    }
75
}
76