Passed
Pull Request — master (#662)
by
unknown
13:11
created

Producer::reconnect()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 0
dl 0
loc 4
ccs 0
cts 0
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Connection\AbstractConnection;
7
use PhpAmqpLib\Message\AMQPMessage;
8
use PhpAmqpLib\Wire\AMQPTable;
9
10
/**
11
 * Producer, that publishes AMQP Messages
12
 */
13
class Producer extends BaseAmqp implements ProducerInterface
14
{
15
    public const DEFAULT_CONTENT_TYPE = 'text/plain';
16
    protected $contentType = Producer::DEFAULT_CONTENT_TYPE;
17
    protected $deliveryMode = 2;
18
    protected $defaultRoutingKey = '';
19
    protected $acknowledged = true;
20
    protected $confirmationTimeout = 0;
21
    protected $confirmSelect = false;
22
23
    public function setContentType($contentType)
24
    {
25
        $this->contentType = $contentType;
26
27
        return $this;
28
    }
29
30
    public function setDeliveryMode($deliveryMode)
31
    {
32
        $this->deliveryMode = $deliveryMode;
33
34
        return $this;
35
    }
36
37
    public function setDefaultRoutingKey($defaultRoutingKey)
38
    {
39
        $this->defaultRoutingKey = $defaultRoutingKey;
40
41
        return $this;
42
    }
43
44
    protected function getBasicProperties()
45
    {
46
        return ['content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode];
47
    }
48
49
    public function setConfirmationTimeout(int $confirmationTimeout): void
50
    {
51
        $this->confirmationTimeout = $confirmationTimeout;
52
    }
53
54
    /**
55
     * @param AbstractConnection $conn
56
     * @param AMQPChannel|null $ch
57
     * @param null $consumerTag
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $consumerTag is correct as it would always require null to be passed?
Loading history...
58
     * @param bool $confirmSelect
59
     */
60
    public function __construct(AbstractConnection $conn, AMQPChannel $ch = null, $consumerTag = null, bool $confirmSelect = false)
61
    {
62
        parent::__construct($conn, $ch, $consumerTag);
63
        $this->confirmSelect = $confirmSelect;
64
        $this->initializeProducer();
65
    }
66
67
    /**
68
     * Publishes the message and merges additional properties with basic properties
69
     *
70
     * @param string $msgBody
71
     * @param string $routingKey
72
     * @param array $additionalProperties
73
     * @param array|null $headers
74
     * @return bool
75
     */
76
    public function publish($msgBody, $routingKey = null, $additionalProperties = [], array $headers = null)
77
    {
78
        if ($this->autoSetupFabric) {
79
            $this->setupFabric();
80
        }
81
82
        $msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties));
83
84
        if (!empty($headers)) {
85
            $headersTable = new AMQPTable($headers);
86
            $msg->set('application_headers', $headersTable);
87
        }
88
89
        $real_routingKey = !empty($routingKey) ? $routingKey : $this->defaultRoutingKey;
90
        $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$real_routingKey);
91
        $this->getChannel()->wait_for_pending_acks($this->confirmationTimeout);
92
        $this->logger->debug('AMQP message published', array(
93
            'amqp' => array(
94
                'body' => $msgBody,
95
                'routingkeys' => $routingKey,
96
                'properties' => $additionalProperties,
97
                'headers' => $headers
98
            )
99
        ));
100
        return $this->acknowledged;
101
    }
102
103
    public function reconnect()
104
    {
105
        parent::reconnect();
106
        $this->initializeProducer();
107
    }
108
109
    /**
110
     * Initializes the producer
111
     * If confirmation is enabled set confirm_select at the channel
112
     * and add ack and nack handlers
113
     */
114
    protected function initializeProducer(): void
115
    {
116
        if ($this->confirmSelect) {
117
            $this->getChannel()->confirm_select();
118
            $this->getChannel()->set_ack_handler(
119
                function (AMQPMessage $message) {
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

119
                function (/** @scrutinizer ignore-unused */ AMQPMessage $message) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
120
                    $this->acknowledged = true;
121
                }
122
            );
123
124
            $this->getChannel()->set_nack_handler(
125
                function (AMQPMessage $message) {
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

125
                function (/** @scrutinizer ignore-unused */ AMQPMessage $message) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
126
                    $this->acknowledged = false;
127
                }
128
            );
129
        }
130
    }
131
}
132