Exchange::publish()   B
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 24
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 24
ccs 8
cts 8
cp 1
rs 8.9713
cc 2
eloc 17
nc 2
nop 5
crap 2
1
<?php
2
3
namespace AMQPAL\Adapter\PhpAmqpLib;
4
5
use PhpAmqpLib\Message\AMQPMessage;
6
use AMQPAL\Adapter\ExchangeInterface;
7
use AMQPAL\Options;
8
use AMQPAL\Exception as BaseException;
9
10
/**
11
 * Class Exchange
12
 *
13
 * @package AMQPAL\Adapter\PhpAmqpLib
14
 */
15
class Exchange implements ExchangeInterface
16
{
17
    /**
18
     * @var Channel
19
     */
20
    protected $channel;
21
    /**
22
     * @var Options\ExchangeOptions
23
     */
24
    protected $options;
25
26
    /**
27
     * @return Options\ExchangeOptions
28
     */
29 2
    public function getOptions()
30
    {
31 2
        return $this->options;
32
    }
33
34
    /**
35
     * @param Options\ExchangeOptions|\Traversable|array $exchangeOptions
36
     * @return $this
37
     * @throws BaseException\BadMethodCallException
38
     * @throws BaseException\InvalidArgumentException
39
     */
40 22
    public function setOptions($exchangeOptions)
41
    {
42 22
        if (!$exchangeOptions instanceof Options\ExchangeOptions) {
43 1
            $exchangeOptions = new Options\ExchangeOptions($exchangeOptions);
44
        }
45 22
        $this->options = $exchangeOptions;
46 22
        return $this;
47
    }
48
49
    /**
50
     * Declare a new exchange on the broker.
51
     *
52
     * @return $this
53
     */
54 8
    public function declareExchange()
55
    {
56 8
        $this->channel->getResource()->exchange_declare(
57 8
            $this->options->getName(),
58 8
            $this->options->getType(),
59 8
            $this->options->isPassive(),
60 8
            $this->options->isDurable(),
61 8
            $this->options->isAutoDelete(),
62 8
            $this->options->isInternal(),
63 8
            $this->options->isNoWait(),
64 8
            $this->options->getArguments()
65
        );
66
67 8
        return $this;
68
    }
69
70
    /**
71
     * Delete the exchange from the broker.
72
     *
73
     * @param bool   $ifUnused      Optional if the exchange should not be
74
     *                              deleted until no clients are connected to
75
     *                              it.
76
     * @param bool   $noWait        No wait for a reply
77
     *
78
     * @return $this
79
     */
80 4
    public function delete($ifUnused = false, $noWait = false)
81
    {
82 4
        $this->channel->getResource()->exchange_delete($this->options->getName(), $ifUnused, $noWait);
83
84 4
        return $this;
85
    }
86
87
    /**
88
     * Bind to another exchange.
89
     *
90
     * Bind an exchange to another exchange using the specified routing key.
91
     *
92
     * @param string $exchangeName Name of the exchange to bind.
93
     * @param string $routingKey   The routing key to use for binding.
94
     * @param bool   $noWait       No wait for a reply
95
     * @param array  $arguments    Additional binding arguments.
96
     *
97
     * @return $this
98
     */
99 2
    public function bind($exchangeName, $routingKey = null, $noWait = false, array $arguments = [])
100
    {
101 2
        if (null === $routingKey) {
102 1
            $routingKey = '';
103
        }
104 2
        $name = $this->options->getName();
105 2
        $this->channel->getResource()->exchange_bind($name, $exchangeName, $routingKey, $noWait, $arguments);
106
107 2
        return $this;
108
    }
109
110
    /**
111
     * Remove binding to another exchange.
112
     *
113
     * Remove a routing key binding on an another exchange from the given exchange.
114
     *
115
     * @param string $exchangeName Name of the exchange to bind.
116
     * @param string $routingKey   The routing key to use for binding.
117
     * @param array  $arguments    Additional binding arguments.
118
     *
119
     * @return $this
120
     */
121 2
    public function unbind($exchangeName, $routingKey = null, array $arguments = [])
122
    {
123 2
        if (null === $routingKey) {
124 1
            $routingKey = '';
125
        }
126 2
        $name = $this->options->getName();
127 2
        $this->channel->getResource()->exchange_unbind($name, $exchangeName, $routingKey, $arguments);
128
129 2
        return $this;
130
    }
131
132
    /**
133
     * Publish a message to an exchange.
134
     *
135
     * Publish a message to the exchange represented by the Exchange object.
136
     *
137
     * @param string $message      The message to publish.
138
     * @param string $routingKey   The optional routing key to which to
139
     *                             publish to.
140
     * @param bool   $mandatory    Mandatory
141
     * @param bool   $immediate    Immediate
142
     * @param array  $attributes   One of content_type, content_encoding,
143
     *                             message_id, user_id, app_id, delivery_mode,
144
     *                             priority, timestamp, expiration, type
145
     *                             or reply_to, headers.
146
     *
147
     * @return $this
148
     */
149 9
    public function publish(
150
        $message,
151
        $routingKey = null,
152
        $mandatory = false,
153
        $immediate = false,
154
        array $attributes = []
155
    ) {
156 9
        if (null === $routingKey) {
157 6
            $routingKey = '';
158
        }
159 9
        $options = $this->options;
160
161 9
        $AMQPMessage = new AMQPMessage($message, $attributes);
162
163 9
        $this->channel->getResource()->basic_publish(
164
            $AMQPMessage,
165 9
            $options->getName(),
166
            $routingKey,
167
            $mandatory,
168
            $immediate
169
        );
170
171 9
        return $this;
172
    }
173
174
    /**
175
     * @param Channel $channel
176
     * @return $this
177
     */
178 22
    public function setChannel(Channel $channel)
179
    {
180 22
        $this->channel = $channel;
181 22
        return $this;
182
    }
183
184
    /**
185
     * Get the Channel object in use
186
     *
187
     * @return Channel
188
     */
189 1
    public function getChannel()
190
    {
191 1
        return $this->channel;
192
    }
193
194
    /**
195
     * Get the Connection object in use
196
     *
197
     * @return Connection
198
     */
199 1
    public function getConnection()
200
    {
201 1
        return $this->channel->getConnection();
202
    }
203
}
204