Completed
Push — master ( 3a5f88...05006a )
by Thomas Mauro
10:18 queued 07:14
created

Exchange::configureExchange()   B

Complexity

Conditions 6
Paths 32

Size

Total Lines 29
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 29
ccs 19
cts 19
cp 1
rs 8.439
cc 6
eloc 19
nc 32
nop 0
crap 6
1
<?php
2
3
namespace AMQPAL\Adapter\AMQP;
4
5
use AMQPExchange;
6
use AMQPAL\Adapter\ExchangeInterface;
7
use AMQPAL\Adapter\Exception;
8
use AMQPAL\Options;
9
10
/**
11
 * Class Exchange
12
 *
13
 * @package AMQPAL\Adapter\AMQP
14
 */
15
class Exchange implements ExchangeInterface
16
{
17
    /**
18
     * @var Channel
19
     */
20
    protected $channel;
21
    /**
22
     * @var AMQPExchange
23
     */
24
    protected $resource;
25
    /**
26
     * @var Options\ExchangeOptions
27
     */
28
    protected $options;
29
30
    /**
31
     * @return Options\ExchangeOptions
32
     */
33 11
    public function getOptions()
34
    {
35 11
        return $this->options;
36
    }
37
38
    /**
39
     * @param Options\ExchangeOptions $exchangeOptions
40
     * @return $this
41
     */
42 11
    public function setOptions(Options\ExchangeOptions $exchangeOptions)
43
    {
44 11
        $this->options = $exchangeOptions;
45 11
        $this->configureExchange();
46 11
        return $this;
47
    }
48
49
    /**
50
     * @return AMQPExchange
51
     */
52 11
    public function getResource()
53
    {
54 11
        return $this->resource;
55
    }
56
57
    /**
58
     * @param AMQPExchange $resource
59
     * @return $this
60
     */
61 11
    public function setResource(AMQPExchange $resource)
62
    {
63 11
        $this->resource = $resource;
64 11
        return $this;
65
    }
66
67
    /**
68
     * @return $this
69
     */
70 11
    protected function configureExchange()
71
    {
72 11
        $options = $this->getOptions();
73 11
        $exchange = $this->getResource();
74
75 11
        $flags = AMQP_NOPARAM;
76 11
        if ($options->isDurable()) {
77 11
            $flags |= AMQP_DURABLE;
78
        }
79 11
        if ($options->isPassive()) {
80 11
            $flags |= AMQP_PASSIVE;
81
        }
82 11
        if ($options->isAutoDelete()) {
83 11
            $flags |= AMQP_AUTODELETE;
84
        }
85 11
        if ($options->isInternal()) {
86 11
            $flags |= AMQP_INTERNAL;
87
        }
88 11
        if ($options->isNoWait()) {
89 11
            $flags |= AMQP_NOWAIT;
90
        }
91
92 11
        $exchange->setType($options->getType());
93 11
        $exchange->setName($options->getName());
94 11
        $exchange->setFlags($flags);
95 11
        $exchange->setArguments($options->getArguments());
96
97 11
        return $this;
98
    }
99
100
    /**
101
     * Declare a new exchange on the broker.
102
     *
103
     * @return $this
104
     * @throws Exception\RuntimeException
105
     * @throws \AMQPExchangeException
106
     * @throws \AMQPChannelException
107
     * @throws \AMQPConnectionException
108
     */
109 1
    public function declareExchange()
110
    {
111 1
        $this->resource->declareExchange();
112
113 1
        return $this;
114
    }
115
116
    /**
117
     * Delete the exchange from the broker.
118
     *
119
     * @param bool   $ifUnused      Optional if the exchange should not be
120
     *                              deleted until no clients are connected to
121
     *                              it.
122
     * @param bool   $noWait        No wait for a reply
123
     *
124
     * @return $this
125
     * @throws \AMQPExchangeException
126
     * @throws \AMQPChannelException
127
     * @throws \AMQPConnectionException
128
     */
129 4
    public function delete($ifUnused = false, $noWait = false)
130
    {
131 4
        $flags = AMQP_NOPARAM;
132 4
        if ($ifUnused) {
133 2
            $flags |= AMQP_IFUNUSED;
134
        }
135 4
        if ($noWait) {
136 2
            $flags |= AMQP_NOWAIT;
137
        }
138
139 4
        $this->resource->delete($this->options->getName(), $flags);
140
141 4
        return $this;
142
    }
143
144
    /**
145
     * Bind to another exchange.
146
     *
147
     * Bind an exchange to another exchange using the specified routing key.
148
     *
149
     * @param string $exchangeName Name of the exchange to bind.
150
     * @param string $routingKey   The routing key to use for binding.
151
     * @param bool   $noWait       No wait for a reply
152
     * @param array  $arguments    Additional binding arguments.
153
     *
154
     * @return $this
155
     * @throws \AMQPExchangeException
156
     * @throws \AMQPChannelException
157
     * @throws \AMQPConnectionException
158
     */
159 1
    public function bind($exchangeName, $routingKey = null, $noWait = false, array $arguments = [])
160
    {
161 1
        $this->resource->bind($exchangeName, $routingKey, $arguments);
162
163 1
        return $this;
164
    }
165
166
    /**
167
     * Remove binding to another exchange.
168
     *
169
     * Remove a routing key binding on an another exchange from the given exchange.
170
     *
171
     * @param string $exchangeName Name of the exchange to bind.
172
     * @param string $routingKey   The routing key to use for binding.
173
     * @param array  $arguments    Additional binding arguments.
174
     *
175
     * @return $this
176
     * @throws \AMQPExchangeException
177
     * @throws \AMQPChannelException
178
     * @throws \AMQPConnectionException
179
     */
180 1
    public function unbind($exchangeName, $routingKey = null, array $arguments = [])
181
    {
182 1
        $this->resource->unbind($exchangeName, $routingKey, $arguments);
183
184 1
        return $this;
185
    }
186
187
    /**
188
     * Publish a message to an exchange.
189
     *
190
     * Publish a message to the exchange represented by the Exchange object.
191
     *
192
     * @param string $message      The message to publish.
193
     * @param string $routingKey   The optional routing key to which to
194
     *                             publish to.
195
     * @param bool   $mandatory    Mandatory
196
     * @param bool   $immediate    Immediate
197
     * @param array  $attributes   One of content_type, content_encoding,
198
     *                             message_id, user_id, app_id, delivery_mode,
199
     *                             priority, timestamp, expiration, type
200
     *                             or reply_to, headers.
201
     *
202
     * @return $this
203
     * @throws \AMQPExchangeException
204
     * @throws \AMQPChannelException
205
     * @throws \AMQPConnectionException
206
     */
207 4
    public function publish(
208
        $message,
209
        $routingKey = null,
210
        $mandatory = false,
211
        $immediate = false,
212
        array $attributes = []
213
    ) {
214 4
        $flags = AMQP_NOPARAM;
215 4
        if ($mandatory) {
216 2
            $flags |= AMQP_MANDATORY;
217
        }
218 4
        if ($immediate) {
219 2
            $flags |= AMQP_IMMEDIATE;
220
        }
221
222 4
        $this->resource->publish($message, $routingKey, $flags, $attributes);
223
224 4
        return $this;
225
    }
226
227
    /**
228
     * Get the Connection object in use
229
     *
230
     * @return Connection
231
     */
232
    public function getConnection()
233
    {
234
        return $this->getChannel()->getConnection();
235
    }
236
237
    /**
238
     * @param Channel $channel
239
     * @return $this
240
     */
241
    public function setChannel(Channel $channel)
242
    {
243
        $this->channel = $channel;
244
        return $this;
245
    }
246
247
    /**
248
     * Get the Channel object in use
249
     *
250
     * @return Channel
251
     */
252
    public function getChannel()
253
    {
254
        return $this->channel;
255
    }
256
}
257