Completed
Push — master ( f55883...4285c1 )
by Thomas Mauro
13:12 queued 10:06
created

Exchange   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 242
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 2

Test Coverage

Coverage 100%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 22
c 2
b 0
f 0
lcom 2
cbo 2
dl 0
loc 242
ccs 62
cts 62
cp 1
rs 10

13 Methods

Rating   Name   Duplication   Size   Complexity  
A getConnection() 0 4 1
A setChannel() 0 5 1
A getChannel() 0 4 1
A getOptions() 0 4 1
A setOptions() 0 6 1
A getResource() 0 4 1
A setResource() 0 5 1
B configureExchange() 0 29 6
A declareExchange() 0 6 1
A delete() 0 14 3
A bind() 0 6 1
A unbind() 0 6 1
A publish() 0 19 3
1
<?php
2
3
namespace AMQPAL\Adapter\AMQP;
4
5
use AMQPExchange;
6
use AMQPAL\Adapter\ExchangeInterface;
7
use AMQPAL\Adapter\Exception;
8
use AMQPAL\Options;
9
10
/**
11
 * Class Exchange
12
 *
13
 * @package AMQPAL\Adapter\AMQP
14
 */
15
class Exchange implements ExchangeInterface
16
{
17
    /**
18
     * @var Channel
19
     */
20
    protected $channel;
21
    /**
22
     * @var AMQPExchange
23
     */
24
    protected $resource;
25
    /**
26
     * @var Options\ExchangeOptions
27
     */
28
    protected $options;
29
30
    /**
31
     * @return Options\ExchangeOptions
32
     */
33 12
    public function getOptions()
34
    {
35 12
        return $this->options;
36
    }
37
38
    /**
39
     * @param Options\ExchangeOptions $exchangeOptions
40
     * @return $this
41
     */
42 12
    public function setOptions(Options\ExchangeOptions $exchangeOptions)
43
    {
44 12
        $this->options = $exchangeOptions;
45 12
        $this->configureExchange();
46 12
        return $this;
47
    }
48
49
    /**
50
     * @return AMQPExchange
51
     */
52 13
    public function getResource()
53
    {
54 13
        return $this->resource;
55
    }
56
57
    /**
58
     * @param AMQPExchange $resource
59
     * @return $this
60
     */
61 13
    public function setResource(AMQPExchange $resource)
62
    {
63 13
        $this->resource = $resource;
64 13
        return $this;
65
    }
66
67
    /**
68
     * @return $this
69
     */
70 12
    protected function configureExchange()
71
    {
72 12
        $options = $this->getOptions();
73 12
        $exchange = $this->getResource();
74
75 12
        $flags = AMQP_NOPARAM;
76 12
        if ($options->isDurable()) {
77 12
            $flags |= AMQP_DURABLE;
78
        }
79 12
        if ($options->isPassive()) {
80 12
            $flags |= AMQP_PASSIVE;
81
        }
82 12
        if ($options->isAutoDelete()) {
83 12
            $flags |= AMQP_AUTODELETE;
84
        }
85 12
        if ($options->isInternal()) {
86 12
            $flags |= AMQP_INTERNAL;
87
        }
88 12
        if ($options->isNoWait()) {
89 12
            $flags |= AMQP_NOWAIT;
90
        }
91
92 12
        $exchange->setType($options->getType());
93 12
        $exchange->setName($options->getName());
94 12
        $exchange->setFlags($flags);
95 12
        $exchange->setArguments($options->getArguments());
96
97 12
        return $this;
98
    }
99
100
    /**
101
     * Declare a new exchange on the broker.
102
     *
103
     * @return $this
104
     * @throws Exception\RuntimeException
105
     * @throws \AMQPExchangeException
106
     * @throws \AMQPChannelException
107
     * @throws \AMQPConnectionException
108
     */
109 1
    public function declareExchange()
110
    {
111 1
        $this->resource->declareExchange();
112
113 1
        return $this;
114
    }
115
116
    /**
117
     * Delete the exchange from the broker.
118
     *
119
     * @param bool   $ifUnused      Optional if the exchange should not be
120
     *                              deleted until no clients are connected to
121
     *                              it.
122
     * @param bool   $noWait        No wait for a reply
123
     *
124
     * @return $this
125
     * @throws \AMQPExchangeException
126
     * @throws \AMQPChannelException
127
     * @throws \AMQPConnectionException
128
     */
129 4
    public function delete($ifUnused = false, $noWait = false)
130
    {
131 4
        $flags = AMQP_NOPARAM;
132 4
        if ($ifUnused) {
133 2
            $flags |= AMQP_IFUNUSED;
134
        }
135 4
        if ($noWait) {
136 2
            $flags |= AMQP_NOWAIT;
137
        }
138
139 4
        $this->resource->delete($this->options->getName(), $flags);
140
141 4
        return $this;
142
    }
143
144
    /**
145
     * Bind to another exchange.
146
     *
147
     * Bind an exchange to another exchange using the specified routing key.
148
     *
149
     * @param string $exchangeName Name of the exchange to bind.
150
     * @param string $routingKey   The routing key to use for binding.
151
     * @param bool   $noWait       No wait for a reply
152
     * @param array  $arguments    Additional binding arguments.
153
     *
154
     * @return $this
155
     * @throws \AMQPExchangeException
156
     * @throws \AMQPChannelException
157
     * @throws \AMQPConnectionException
158
     */
159 1
    public function bind($exchangeName, $routingKey = null, $noWait = false, array $arguments = [])
160
    {
161 1
        $this->resource->bind($exchangeName, $routingKey, $arguments);
162
163 1
        return $this;
164
    }
165
166
    /**
167
     * Remove binding to another exchange.
168
     *
169
     * Remove a routing key binding on an another exchange from the given exchange.
170
     *
171
     * @param string $exchangeName Name of the exchange to bind.
172
     * @param string $routingKey   The routing key to use for binding.
173
     * @param array  $arguments    Additional binding arguments.
174
     *
175
     * @return $this
176
     * @throws \AMQPExchangeException
177
     * @throws \AMQPChannelException
178
     * @throws \AMQPConnectionException
179
     */
180 1
    public function unbind($exchangeName, $routingKey = null, array $arguments = [])
181
    {
182 1
        $this->resource->unbind($exchangeName, $routingKey, $arguments);
183
184 1
        return $this;
185
    }
186
187
    /**
188
     * Publish a message to an exchange.
189
     *
190
     * Publish a message to the exchange represented by the Exchange object.
191
     *
192
     * @param string $message      The message to publish.
193
     * @param string $routingKey   The optional routing key to which to
194
     *                             publish to.
195
     * @param bool   $mandatory    Mandatory
196
     * @param bool   $immediate    Immediate
197
     * @param array  $attributes   One of content_type, content_encoding,
198
     *                             message_id, user_id, app_id, delivery_mode,
199
     *                             priority, timestamp, expiration, type
200
     *                             or reply_to, headers.
201
     *
202
     * @return $this
203
     * @throws \AMQPExchangeException
204
     * @throws \AMQPChannelException
205
     * @throws \AMQPConnectionException
206
     */
207 4
    public function publish(
208
        $message,
209
        $routingKey = null,
210
        $mandatory = false,
211
        $immediate = false,
212
        array $attributes = []
213
    ) {
214 4
        $flags = AMQP_NOPARAM;
215 4
        if ($mandatory) {
216 2
            $flags |= AMQP_MANDATORY;
217
        }
218 4
        if ($immediate) {
219 2
            $flags |= AMQP_IMMEDIATE;
220
        }
221
222 4
        $this->resource->publish($message, $routingKey, $flags, $attributes);
223
224 4
        return $this;
225
    }
226
227
    /**
228
     * Get the Connection object in use
229
     *
230
     * @return Connection
231
     */
232 1
    public function getConnection()
233
    {
234 1
        return $this->getChannel()->getConnection();
235
    }
236
237
    /**
238
     * @param Channel $channel
239
     * @return $this
240
     */
241 2
    public function setChannel(Channel $channel)
242
    {
243 2
        $this->channel = $channel;
244 2
        return $this;
245
    }
246
247
    /**
248
     * Get the Channel object in use
249
     *
250
     * @return Channel
251
     */
252 2
    public function getChannel()
253
    {
254 2
        return $this->channel;
255
    }
256
}
257