1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace TreeHouse\Queue\Amqp\Driver\Amqp; |
4
|
|
|
|
5
|
|
|
use TreeHouse\Queue\Amqp\ChannelInterface; |
6
|
|
|
use TreeHouse\Queue\Amqp\ExchangeInterface; |
7
|
|
|
use TreeHouse\Queue\Exception\ChannelException; |
8
|
|
|
use TreeHouse\Queue\Exception\ConnectionException; |
9
|
|
|
use TreeHouse\Queue\Exception\ExchangeException; |
10
|
|
|
|
11
|
|
|
class Exchange implements ExchangeInterface |
12
|
|
|
{ |
13
|
|
|
/** |
14
|
|
|
* @var \AMQPExchange |
15
|
|
|
*/ |
16
|
|
|
protected $delegate; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* @var ChannelInterface |
20
|
|
|
*/ |
21
|
|
|
protected $channel; |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* @var array<integer, integer> |
25
|
|
|
*/ |
26
|
|
|
protected static $flagMap = [ |
27
|
|
|
self::NOPARAM => AMQP_NOPARAM, |
28
|
|
|
self::DURABLE => AMQP_DURABLE, |
29
|
|
|
self::PASSIVE => AMQP_PASSIVE, |
30
|
|
|
self::AUTODELETE => AMQP_AUTODELETE, |
31
|
|
|
self::INTERNAL => AMQP_INTERNAL, |
32
|
|
|
self::IFUNUSED => AMQP_IFUNUSED, |
33
|
|
|
self::MANDATORY => AMQP_MANDATORY, |
34
|
|
|
self::IMMEDIATE => AMQP_IMMEDIATE, |
35
|
|
|
self::NOWAIT => AMQP_NOWAIT, |
36
|
|
|
]; |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* @param \AMQPExchange $delegate |
40
|
|
|
* @param ChannelInterface $channel |
41
|
|
|
*/ |
42
|
11 |
|
public function __construct(\AMQPExchange $delegate, ChannelInterface &$channel) |
43
|
|
|
{ |
44
|
11 |
|
$this->delegate = $delegate; |
45
|
11 |
|
$this->channel = &$channel; |
46
|
11 |
|
} |
47
|
|
|
|
48
|
|
|
/** |
49
|
|
|
* @inheritdoc |
50
|
|
|
*/ |
51
|
1 |
|
public function getChannel() |
52
|
|
|
{ |
53
|
1 |
|
return $this->channel; |
54
|
|
|
} |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* @inheritdoc |
58
|
|
|
*/ |
59
|
1 |
|
public function getConnection() |
60
|
|
|
{ |
61
|
1 |
|
return $this->channel->getConnection(); |
62
|
|
|
} |
63
|
|
|
|
64
|
|
|
/** |
65
|
|
|
* @inheritdoc |
66
|
|
|
*/ |
67
|
5 |
|
public function getName() |
68
|
|
|
{ |
69
|
5 |
|
return $this->delegate->getName(); |
70
|
|
|
} |
71
|
|
|
|
72
|
|
|
/** |
73
|
|
|
* @inheritdoc |
74
|
|
|
*/ |
75
|
2 |
|
public function getType() |
76
|
|
|
{ |
77
|
2 |
|
return $this->delegate->getType(); |
78
|
|
|
} |
79
|
|
|
|
80
|
|
|
/** |
81
|
|
|
* @inheritdoc |
82
|
|
|
*/ |
83
|
1 |
|
public function getFlags() |
84
|
|
|
{ |
85
|
1 |
|
return self::convertFromDelegateFlags($this->delegate->getFlags()); |
86
|
|
|
} |
87
|
|
|
|
88
|
|
|
/** |
89
|
|
|
* @inheritdoc |
90
|
|
|
*/ |
91
|
1 |
|
public function getArgument($key) |
92
|
|
|
{ |
93
|
1 |
|
return $this->delegate->getArgument($key); |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
/** |
97
|
|
|
* @inheritdoc |
98
|
|
|
*/ |
99
|
1 |
|
public function getArguments() |
100
|
|
|
{ |
101
|
1 |
|
return $this->delegate->getArguments(); |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
/** |
105
|
|
|
* @inheritdoc |
106
|
|
|
*/ |
107
|
4 |
View Code Duplication |
public function bind($exchangeName, $routingKey, array $arguments = []) |
|
|
|
|
108
|
|
|
{ |
109
|
|
|
try { |
110
|
4 |
|
return $this->delegate->bind($exchangeName, $routingKey, $arguments); |
111
|
2 |
|
} catch (\AMQPExchangeException $e) { |
112
|
1 |
|
throw new ExchangeException($e->getMessage(), $e->getCode(), $e); |
113
|
1 |
|
} catch (\AMQPChannelException $e) { |
114
|
1 |
|
throw new ChannelException($e->getMessage(), $e->getCode(), $e); |
115
|
|
|
} catch (\AMQPConnectionException $e) { |
116
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
117
|
|
|
} |
118
|
|
|
} |
119
|
|
|
|
120
|
|
|
/** |
121
|
|
|
* @inheritdoc |
122
|
|
|
*/ |
123
|
2 |
View Code Duplication |
public function unbind($exchangeName, $routingKey, array $arguments = []) |
|
|
|
|
124
|
|
|
{ |
125
|
|
|
try { |
126
|
2 |
|
return $this->delegate->unbind($exchangeName, $routingKey, $arguments); |
127
|
1 |
|
} catch (\AMQPExchangeException $e) { |
128
|
|
|
throw new ExchangeException($e->getMessage(), $e->getCode(), $e); |
129
|
1 |
|
} catch (\AMQPChannelException $e) { |
130
|
1 |
|
throw new ChannelException($e->getMessage(), $e->getCode(), $e); |
131
|
|
|
} catch (\AMQPConnectionException $e) { |
132
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
133
|
|
|
} |
134
|
|
|
} |
135
|
|
|
|
136
|
|
|
/** |
137
|
|
|
* @inheritdoc |
138
|
|
|
*/ |
139
|
6 |
|
public function declareExchange() |
140
|
|
|
{ |
141
|
|
|
try { |
142
|
6 |
|
return $this->delegate->declareExchange(); |
143
|
1 |
|
} catch (\AMQPExchangeException $e) { |
144
|
|
|
throw new ExchangeException($e->getMessage(), $e->getCode(), $e); |
145
|
1 |
|
} catch (\AMQPChannelException $e) { |
146
|
1 |
|
throw new ChannelException($e->getMessage(), $e->getCode(), $e); |
147
|
|
|
} catch (\AMQPConnectionException $e) { |
148
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
149
|
|
|
} |
150
|
|
|
} |
151
|
|
|
|
152
|
|
|
/** |
153
|
|
|
* @inheritdoc |
154
|
|
|
*/ |
155
|
10 |
View Code Duplication |
public function delete($exchangeName = null, $flags = null) |
|
|
|
|
156
|
|
|
{ |
157
|
|
|
try { |
158
|
10 |
|
return $this->delegate->delete($exchangeName, self::convertToDelegateFlags($flags)); |
159
|
5 |
|
} catch (\AMQPExchangeException $e) { |
160
|
1 |
|
throw new ExchangeException($e->getMessage(), $e->getCode(), $e); |
161
|
4 |
|
} catch (\AMQPChannelException $e) { |
162
|
4 |
|
throw new ChannelException($e->getMessage(), $e->getCode(), $e); |
163
|
|
|
} catch (\AMQPConnectionException $e) { |
164
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
165
|
|
|
} |
166
|
|
|
} |
167
|
|
|
|
168
|
|
|
/** |
169
|
|
|
* @inheritdoc |
170
|
|
|
*/ |
171
|
2 |
View Code Duplication |
public function publish($message, $routingKey = null, $flags = null, array $attributes = []) |
|
|
|
|
172
|
|
|
{ |
173
|
|
|
try { |
174
|
2 |
|
return $this->delegate->publish($message, $routingKey, self::convertToDelegateFlags($flags), $attributes); |
175
|
|
|
} catch (\AMQPExchangeException $e) { |
176
|
|
|
throw new ExchangeException($e->getMessage(), $e->getCode(), $e); |
177
|
|
|
} catch (\AMQPChannelException $e) { |
178
|
|
|
throw new ChannelException($e->getMessage(), $e->getCode(), $e); |
179
|
|
|
} catch (\AMQPConnectionException $e) { |
180
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
181
|
|
|
} |
182
|
|
|
} |
183
|
|
|
|
184
|
|
|
/** |
185
|
|
|
* @inheritdoc |
186
|
|
|
* |
187
|
|
|
* @return \AMQPExchange |
188
|
|
|
*/ |
189
|
|
|
public function getDelegate() |
190
|
|
|
{ |
191
|
|
|
return $this->delegate; |
192
|
|
|
} |
193
|
|
|
|
194
|
|
|
/** |
195
|
|
|
* @param int|null $flags |
196
|
|
|
* |
197
|
|
|
* @return int |
198
|
|
|
*/ |
199
|
12 |
View Code Duplication |
public static function convertToDelegateFlags($flags = null) |
|
|
|
|
200
|
|
|
{ |
201
|
12 |
|
if (null === $flags) { |
202
|
11 |
|
return AMQP_NOPARAM; |
203
|
|
|
} |
204
|
|
|
|
205
|
2 |
|
$converted = 0; |
206
|
|
|
|
207
|
2 |
|
foreach (self::$flagMap as $from => $to) { |
208
|
2 |
|
if ($flags & $from) { |
209
|
2 |
|
$converted |= $to; |
210
|
|
|
} |
211
|
|
|
} |
212
|
|
|
|
213
|
2 |
|
return $converted; |
214
|
|
|
} |
215
|
|
|
|
216
|
|
|
/** |
217
|
|
|
* @param int|null $flags |
218
|
|
|
* |
219
|
|
|
* @return int |
220
|
|
|
*/ |
221
|
2 |
View Code Duplication |
public static function convertFromDelegateFlags($flags = null) |
|
|
|
|
222
|
|
|
{ |
223
|
2 |
|
if (null === $flags) { |
224
|
|
|
return self::NOPARAM; |
225
|
|
|
} |
226
|
|
|
|
227
|
2 |
|
$converted = 0; |
228
|
|
|
|
229
|
2 |
|
foreach (self::$flagMap as $from => $to) { |
230
|
2 |
|
if ($flags & $to) { |
231
|
2 |
|
$converted |= $from; |
232
|
|
|
} |
233
|
|
|
} |
234
|
|
|
|
235
|
2 |
|
return $converted; |
236
|
|
|
} |
237
|
|
|
} |
238
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.