Passed
Pull Request — master (#662)
by
unknown
08:28
created

Producer   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 113
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 11
Bugs 0 Features 0
Metric Value
eloc 44
dl 0
loc 113
ccs 0
cts 34
cp 0
rs 10
c 11
b 0
f 0
wmc 13

9 Methods

Rating   Name   Duplication   Size   Complexity  
A setDeliveryMode() 0 5 1
A getBasicProperties() 0 3 1
A reconnect() 0 4 1
A setDefaultRoutingKey() 0 5 1
A __construct() 0 5 1
A publish() 0 25 4
A initializeProducer() 0 13 2
A setContentType() 0 5 1
A setConfirmationTimeout() 0 3 1
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Connection\AbstractConnection;
7
use PhpAmqpLib\Message\AMQPMessage;
8
use PhpAmqpLib\Wire\AMQPTable;
9
10
/**
11
 * Producer, that publishes AMQP Messages
12
 */
13
class Producer extends BaseAmqp implements ProducerInterface
14
{
15
    protected $contentType = 'text/plain';
16
    protected $deliveryMode = 2;
17
    protected $defaultRoutingKey = '';
18
    protected $acknowledged = true;
19
    protected $confirmationTimeout = 0;
20
    protected $confirmSelect = false;
21
22
    public function setContentType($contentType)
23
    {
24
        $this->contentType = $contentType;
25
26
        return $this;
27
    }
28
29
    public function setDeliveryMode($deliveryMode)
30
    {
31
        $this->deliveryMode = $deliveryMode;
32
33
        return $this;
34
    }
35
36
    public function setDefaultRoutingKey($defaultRoutingKey)
37
    {
38
        $this->defaultRoutingKey = $defaultRoutingKey;
39
40
        return $this;
41
    }
42
43
    protected function getBasicProperties()
44
    {
45
        return array('content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode);
46
    }
47
48
    /**
49
     * @param null $confirmationTimeout
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $confirmationTimeout is correct as it would always require null to be passed?
Loading history...
50
     */
51
    public function setConfirmationTimeout($confirmationTimeout): void
52
    {
53
        $this->confirmationTimeout = intval($confirmationTimeout);
54
    }
55
56
    /**
57
     * @param AbstractConnection $conn
58
     * @param AMQPChannel|null $ch
59
     * @param null $consumerTag
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $consumerTag is correct as it would always require null to be passed?
Loading history...
60
     * @param bool $confirmSelect
61
     */
62
    public function __construct(AbstractConnection $conn, AMQPChannel $ch = null, $consumerTag = null, bool $confirmSelect = false)
63
    {
64
        parent::__construct($conn, $ch, $consumerTag);
65
        $this->confirmSelect = $confirmSelect;
66
        $this->initializeProducer();
67
    }
68
69
    /**
70
     * Publishes the message and merges additional properties with basic properties
71
     *
72
     * @param string $msgBody
73
     * @param string $routingKey
74
     * @param array $additionalProperties
75
     * @param array $headers
76
     * @return bool
77
     */
78
    public function publish($msgBody, $routingKey = '', $additionalProperties = array(), array $headers = null): bool
79
    {
80
        if ($this->autoSetupFabric) {
81
            $this->setupFabric();
82
        }
83
84
        $msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties));
85
86
        if (!empty($headers)) {
87
            $headersTable = new AMQPTable($headers);
88
            $msg->set('application_headers', $headersTable);
89
        }
90
91
        $real_routingKey = $routingKey !== null ? $routingKey : $this->defaultRoutingKey;
0 ignored issues
show
introduced by
The condition $routingKey !== null is always true.
Loading history...
92
        $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$real_routingKey);
93
        $this->getChannel()->wait_for_pending_acks($this->confirmationTimeout);
94
        $this->logger->debug('AMQP message published', array(
95
            'amqp' => array(
96
                'body' => $msgBody,
97
                'routingkeys' => $routingKey,
98
                'properties' => $additionalProperties,
99
                'headers' => $headers
100
            )
101
        ));
102
        return $this->acknowledged;
103
    }
104
105
    public function reconnect()
106
    {
107
        parent::reconnect();
108
        $this->initializeProducer();
109
    }
110
111
    /**
112
     */
113
    protected function initializeProducer(): void
114
    {
115
        if ($this->confirmSelect) {
116
            $this->getChannel()->confirm_select();
117
            $this->getChannel()->set_ack_handler(
118
                function (AMQPMessage $message) {
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

118
                function (/** @scrutinizer ignore-unused */ AMQPMessage $message) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
119
                    $this->acknowledged = true;
120
                }
121
            );
122
123
            $this->getChannel()->set_nack_handler(
124
                function (AMQPMessage $message) {
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

124
                function (/** @scrutinizer ignore-unused */ AMQPMessage $message) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
125
                    $this->acknowledged = false;
126
                }
127
            );
128
        }
129
    }
130
}
131