Passed
Pull Request — master (#662)
by
unknown
09:20
created

Producer   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 114
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 13
Bugs 0 Features 0
Metric Value
eloc 45
c 13
b 0
f 0
dl 0
loc 114
ccs 0
cts 26
cp 0
rs 10
wmc 13

9 Methods

Rating   Name   Duplication   Size   Complexity  
A reconnect() 0 4 1
A __construct() 0 5 1
A publish() 0 25 4
A initializeProducer() 0 13 2
A setDeliveryMode() 0 5 1
A getBasicProperties() 0 3 1
A setDefaultRoutingKey() 0 5 1
A setContentType() 0 5 1
A setConfirmationTimeout() 0 3 1
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Connection\AbstractConnection;
7
use PhpAmqpLib\Message\AMQPMessage;
8
use PhpAmqpLib\Wire\AMQPTable;
9
10
/**
11
 * Producer, that publishes AMQP Messages
12
 */
13
class Producer extends BaseAmqp implements ProducerInterface
14
{
15
    public const DEFAULT_CONTENT_TYPE = 'text/plain';
16
    protected $contentType = Producer::DEFAULT_CONTENT_TYPE;
17
    protected $deliveryMode = 2;
18
    protected $defaultRoutingKey = '';
19
    protected $acknowledged = true;
20
    protected $confirmationTimeout = 0;
21
    protected $confirmSelect = false;
22
23
    public function setContentType($contentType)
24
    {
25
        $this->contentType = $contentType;
26
27
        return $this;
28
    }
29
30
    public function setDeliveryMode($deliveryMode)
31
    {
32
        $this->deliveryMode = $deliveryMode;
33
34
        return $this;
35
    }
36
37
    public function setDefaultRoutingKey($defaultRoutingKey)
38
    {
39
        $this->defaultRoutingKey = $defaultRoutingKey;
40
41
        return $this;
42
    }
43
44
    protected function getBasicProperties()
45
    {
46
        return array('content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode);
47
    }
48
49
    /**
50
     * @param null $confirmationTimeout
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $confirmationTimeout is correct as it would always require null to be passed?
Loading history...
51
     */
52
    public function setConfirmationTimeout($confirmationTimeout): void
53
    {
54
        $this->confirmationTimeout = intval($confirmationTimeout);
55
    }
56
57
    /**
58
     * @param AbstractConnection $conn
59
     * @param AMQPChannel|null $ch
60
     * @param null $consumerTag
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $consumerTag is correct as it would always require null to be passed?
Loading history...
61
     * @param bool $confirmSelect
62
     */
63
    public function __construct(AbstractConnection $conn, AMQPChannel $ch = null, $consumerTag = null, bool $confirmSelect = false)
64
    {
65
        parent::__construct($conn, $ch, $consumerTag);
66
        $this->confirmSelect = $confirmSelect;
67
        $this->initializeProducer();
68
    }
69
70
    /**
71
     * Publishes the message and merges additional properties with basic properties
72
     *
73
     * @param string $msgBody
74
     * @param string $routingKey
75
     * @param array $additionalProperties
76
     * @param array|null $headers
77
     * @return bool
78
     */
79
    public function publish($msgBody, $routingKey = '', $additionalProperties = array(), array $headers = null)
80
    {
81
        if ($this->autoSetupFabric) {
82
            $this->setupFabric();
83
        }
84
85
        $msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties));
86
87
        if (!empty($headers)) {
88
            $headersTable = new AMQPTable($headers);
89
            $msg->set('application_headers', $headersTable);
90
        }
91
92
        $real_routingKey = !empty($routingKey) ? $routingKey : $this->defaultRoutingKey;
93
        $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$real_routingKey);
94
        $this->getChannel()->wait_for_pending_acks($this->confirmationTimeout);
95
        $this->logger->debug('AMQP message published', array(
96
            'amqp' => array(
97
                'body' => $msgBody,
98
                'routingkeys' => $routingKey,
99
                'properties' => $additionalProperties,
100
                'headers' => $headers
101
            )
102
        ));
103
        return $this->acknowledged;
104
    }
105
106
    public function reconnect()
107
    {
108
        parent::reconnect();
109
        $this->initializeProducer();
110
    }
111
112
    /**
113
     */
114
    protected function initializeProducer(): void
115
    {
116
        if ($this->confirmSelect) {
117
            $this->getChannel()->confirm_select();
118
            $this->getChannel()->set_ack_handler(
119
                function (AMQPMessage $message) {
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

119
                function (/** @scrutinizer ignore-unused */ AMQPMessage $message) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
120
                    $this->acknowledged = true;
121
                }
122
            );
123
124
            $this->getChannel()->set_nack_handler(
125
                function (AMQPMessage $message) {
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

125
                function (/** @scrutinizer ignore-unused */ AMQPMessage $message) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
126
                    $this->acknowledged = false;
127
                }
128
            );
129
        }
130
    }
131
}
132