Exchange   A
last analyzed

Complexity

Total Complexity 23

Size/Duplication

Total Lines 247
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 2

Test Coverage

Coverage 100%

Importance

Changes 3
Bugs 1 Features 0
Metric Value
wmc 23
c 3
b 1
f 0
lcom 2
cbo 2
dl 0
loc 247
ccs 64
cts 64
cp 1
rs 10

13 Methods

Rating   Name   Duplication   Size   Complexity  
A getOptions() 0 4 1
A setOptions() 0 9 2
A getResource() 0 4 1
A setResource() 0 5 1
B configureExchange() 0 29 6
A declareExchange() 0 6 1
A delete() 0 14 3
A bind() 0 6 1
A unbind() 0 6 1
A publish() 0 19 3
A getConnection() 0 4 1
A setChannel() 0 5 1
A getChannel() 0 4 1
1
<?php
2
3
namespace AMQPAL\Adapter\AMQP;
4
5
use AMQPExchange;
6
use AMQPAL\Adapter\ExchangeInterface;
7
use AMQPAL\Adapter\Exception;
8
use AMQPAL\Options;
9
use AMQPAL\Exception as BaseException;
10
11
/**
12
 * Class Exchange
13
 *
14
 * @package AMQPAL\Adapter\AMQP
15
 */
16
class Exchange implements ExchangeInterface
17
{
18
    /**
19
     * @var Channel
20
     */
21
    protected $channel;
22
    /**
23
     * @var AMQPExchange
24
     */
25
    protected $resource;
26
    /**
27
     * @var Options\ExchangeOptions
28
     */
29
    protected $options;
30
31
    /**
32
     * @return Options\ExchangeOptions
33
     */
34 19
    public function getOptions()
35
    {
36 19
        return $this->options;
37
    }
38
39
    /**
40
     * @param Options\ExchangeOptions|\Traversable|array $exchangeOptions
41
     * @return $this
42
     * @throws BaseException\BadMethodCallException
43
     * @throws BaseException\InvalidArgumentException
44
     */
45 19
    public function setOptions($exchangeOptions)
46
    {
47 19
        if (!$exchangeOptions instanceof Options\ExchangeOptions) {
48 1
            $exchangeOptions = new Options\ExchangeOptions($exchangeOptions);
49
        }
50 19
        $this->options = $exchangeOptions;
51 19
        $this->configureExchange();
52 19
        return $this;
53
    }
54
55
    /**
56
     * @return AMQPExchange
57
     */
58 20
    public function getResource()
59
    {
60 20
        return $this->resource;
61
    }
62
63
    /**
64
     * @param AMQPExchange $resource
65
     * @return $this
66
     */
67 20
    public function setResource(AMQPExchange $resource)
68
    {
69 20
        $this->resource = $resource;
70 20
        return $this;
71
    }
72
73
    /**
74
     * @return $this
75
     */
76 19
    protected function configureExchange()
77
    {
78 19
        $options = $this->getOptions();
79 19
        $exchange = $this->getResource();
80
81 19
        $flags = AMQP_NOPARAM;
82 19
        if ($options->isDurable()) {
83 18
            $flags |= AMQP_DURABLE;
84
        }
85 19
        if ($options->isPassive()) {
86 12
            $flags |= AMQP_PASSIVE;
87
        }
88 19
        if ($options->isAutoDelete()) {
89 13
            $flags |= AMQP_AUTODELETE;
90
        }
91 19
        if ($options->isInternal()) {
92 12
            $flags |= AMQP_INTERNAL;
93
        }
94 19
        if ($options->isNoWait()) {
95 12
            $flags |= AMQP_NOWAIT;
96
        }
97
98 19
        $exchange->setType($options->getType());
99 19
        $exchange->setName($options->getName());
100 19
        $exchange->setFlags($flags);
101 19
        $exchange->setArguments($options->getArguments());
102
103 19
        return $this;
104
    }
105
106
    /**
107
     * Declare a new exchange on the broker.
108
     *
109
     * @return $this
110
     * @throws Exception\RuntimeException
111
     * @throws \AMQPExchangeException
112
     * @throws \AMQPChannelException
113
     * @throws \AMQPConnectionException
114
     */
115 7
    public function declareExchange()
116
    {
117 7
        $this->resource->declareExchange();
118
119 7
        return $this;
120
    }
121
122
    /**
123
     * Delete the exchange from the broker.
124
     *
125
     * @param bool   $ifUnused      Optional if the exchange should not be
126
     *                              deleted until no clients are connected to
127
     *                              it.
128
     * @param bool   $noWait        No wait for a reply
129
     *
130
     * @return $this
131
     * @throws \AMQPExchangeException
132
     * @throws \AMQPChannelException
133
     * @throws \AMQPConnectionException
134
     */
135 4
    public function delete($ifUnused = false, $noWait = false)
136
    {
137 4
        $flags = AMQP_NOPARAM;
138 4
        if ($ifUnused) {
139 2
            $flags |= AMQP_IFUNUSED;
140
        }
141 4
        if ($noWait) {
142 2
            $flags |= AMQP_NOWAIT;
143
        }
144
145 4
        $this->resource->delete($this->options->getName(), $flags);
146
147 4
        return $this;
148
    }
149
150
    /**
151
     * Bind to another exchange.
152
     *
153
     * Bind an exchange to another exchange using the specified routing key.
154
     *
155
     * @param string $exchangeName Name of the exchange to bind.
156
     * @param string $routingKey   The routing key to use for binding.
157
     * @param bool   $noWait       No wait for a reply
158
     * @param array  $arguments    Additional binding arguments.
159
     *
160
     * @return $this
161
     * @throws \AMQPExchangeException
162
     * @throws \AMQPChannelException
163
     * @throws \AMQPConnectionException
164
     */
165 1
    public function bind($exchangeName, $routingKey = null, $noWait = false, array $arguments = [])
166
    {
167 1
        $this->resource->bind($exchangeName, $routingKey, $arguments);
168
169 1
        return $this;
170
    }
171
172
    /**
173
     * Remove binding to another exchange.
174
     *
175
     * Remove a routing key binding on an another exchange from the given exchange.
176
     *
177
     * @param string $exchangeName Name of the exchange to bind.
178
     * @param string $routingKey   The routing key to use for binding.
179
     * @param array  $arguments    Additional binding arguments.
180
     *
181
     * @return $this
182
     * @throws \AMQPExchangeException
183
     * @throws \AMQPChannelException
184
     * @throws \AMQPConnectionException
185
     */
186 1
    public function unbind($exchangeName, $routingKey = null, array $arguments = [])
187
    {
188 1
        $this->resource->unbind($exchangeName, $routingKey, $arguments);
189
190 1
        return $this;
191
    }
192
193
    /**
194
     * Publish a message to an exchange.
195
     *
196
     * Publish a message to the exchange represented by the Exchange object.
197
     *
198
     * @param string $message      The message to publish.
199
     * @param string $routingKey   The optional routing key to which to
200
     *                             publish to.
201
     * @param bool   $mandatory    Mandatory
202
     * @param bool   $immediate    Immediate
203
     * @param array  $attributes   One of content_type, content_encoding,
204
     *                             message_id, user_id, app_id, delivery_mode,
205
     *                             priority, timestamp, expiration, type
206
     *                             or reply_to, headers.
207
     *
208
     * @return $this
209
     * @throws \AMQPExchangeException
210
     * @throws \AMQPChannelException
211
     * @throws \AMQPConnectionException
212
     */
213 9
    public function publish(
214
        $message,
215
        $routingKey = null,
216
        $mandatory = false,
217
        $immediate = false,
218
        array $attributes = []
219
    ) {
220 9
        $flags = AMQP_NOPARAM;
221 9
        if ($mandatory) {
222 2
            $flags |= AMQP_MANDATORY;
223
        }
224 9
        if ($immediate) {
225 2
            $flags |= AMQP_IMMEDIATE;
226
        }
227
228 9
        $this->resource->publish($message, $routingKey, $flags, $attributes);
229
230 9
        return $this;
231
    }
232
233
    /**
234
     * Get the Connection object in use
235
     *
236
     * @return Connection
237
     */
238 1
    public function getConnection()
239
    {
240 1
        return $this->getChannel()->getConnection();
241
    }
242
243
    /**
244
     * @param Channel $channel
245
     * @return $this
246
     */
247 8
    public function setChannel(Channel $channel)
248
    {
249 8
        $this->channel = $channel;
250 8
        return $this;
251
    }
252
253
    /**
254
     * Get the Channel object in use
255
     *
256
     * @return Channel
257
     */
258 2
    public function getChannel()
259
    {
260 2
        return $this->channel;
261
    }
262
}
263