1 | <?php |
||
43 | class AmqpTerminal implements TerminalInterface, LoggerAwareInterface |
||
|
|||
44 | { |
||
45 | |||
46 | const DEFAULT_EXCHANGE_NAME = "Governor.EventBus"; |
||
47 | |||
48 | /** |
||
49 | * @var LoggerInterface |
||
50 | */ |
||
51 | private $logger; |
||
52 | |||
53 | /** |
||
54 | * @var AMQPConnection |
||
55 | */ |
||
56 | private $connection; |
||
57 | |||
58 | /** |
||
59 | * @var string |
||
60 | */ |
||
61 | private $exchangeName = self::DEFAULT_EXCHANGE_NAME; |
||
62 | |||
63 | /** |
||
64 | * @var boolean |
||
65 | */ |
||
66 | private $isTransactional = false; |
||
67 | |||
68 | /** |
||
69 | * @var boolean |
||
70 | */ |
||
71 | private $isDurable = true; |
||
72 | // private ListenerContainerLifecycleManager listenerContainerLifecycleManager; |
||
73 | |||
74 | /** |
||
75 | * @var AMQPMessageConverterInterface |
||
76 | */ |
||
77 | private $messageConverter; |
||
78 | |||
79 | /** |
||
80 | * @var SerializerInterface |
||
81 | */ |
||
82 | private $serializer; |
||
83 | |||
84 | /** |
||
85 | * @var RoutingKeyResolverInterface |
||
86 | */ |
||
87 | private $routingKeyResolver; |
||
88 | |||
89 | /** |
||
90 | * @var boolean |
||
91 | */ |
||
92 | private $waitForAck; |
||
93 | |||
94 | /** |
||
95 | * @var integer |
||
96 | */ |
||
97 | private $publisherAckTimeout = 0; |
||
98 | |||
99 | 8 | public function __construct( |
|
111 | |||
112 | 5 | private function tryClose(AMQPChannel $channel) |
|
120 | |||
121 | /** |
||
122 | * Does the actual publishing of the given <code>body</code> on the given <code>channel</code>. This method can be |
||
123 | * overridden to change the properties used to send a message. |
||
124 | * |
||
125 | * @param AMQPChannel $channel The channel to dispatch the message on |
||
126 | * @param AmqpMessage $amqpMessage The AMQPMessage describing the characteristics of the message to publish |
||
127 | */ |
||
128 | 6 | protected function doSendMessage( |
|
153 | |||
154 | private function tryRollback(AMQPChannel $channel) |
||
162 | |||
163 | /** |
||
164 | * Sets the Connection providing the Channels to send messages on. |
||
165 | * <p/> |
||
166 | * |
||
167 | * @param AMQPConnection $connection The connection to set |
||
168 | */ |
||
169 | 8 | public function setConnection(AMQPConnection $connection) |
|
173 | |||
174 | /** |
||
175 | * Whether this Terminal should dispatch its Events in a transaction or not. Defaults to <code>false</code>. |
||
176 | * <p/> |
||
177 | * If a delegate Terminal is configured, the transaction will be committed <em>after</em> the delegate has |
||
178 | * dispatched the events. |
||
179 | * <p/> |
||
180 | * Transactional behavior cannot be enabled if {@link #setWaitForPublisherAck(boolean)} has been set to |
||
181 | * <code>true</code>. |
||
182 | * |
||
183 | * @param boolean $transactional whether dispatching should be transactional or not |
||
184 | */ |
||
185 | 8 | public function setTransactional($transactional) |
|
193 | |||
194 | /** |
||
195 | * Enables or diables the RabbitMQ specific publisher acknowledgements (confirms). When confirms are enabled, the |
||
196 | * terminal will wait until the server has acknowledged the reception (or fsync to disk on persistent messages) of |
||
197 | * all published messages. |
||
198 | * <p/> |
||
199 | * Server ACKS cannot be enabled when transactions are enabled. |
||
200 | * <p/> |
||
201 | * See <a href="http://www.rabbitmq.com/confirms.html">RabbitMQ Documentation</a> for more information about |
||
202 | * publisher acknowledgements. |
||
203 | * |
||
204 | * @param boolean $waitForPublisherAck whether or not to enab;e server acknowledgements (confirms) |
||
205 | */ |
||
206 | 4 | public function setWaitForPublisherAck($waitForPublisherAck) |
|
214 | |||
215 | /** |
||
216 | * Sets the maximum amount of time (in milliseconds) the publisher may wait for the acknowledgement of published |
||
217 | * messages. If not all messages have been acknowledged withing this time, the publication will throw an |
||
218 | * EventPublicationFailedException. |
||
219 | * <p/> |
||
220 | * This setting is only used when {@link #setWaitForPublisherAck(boolean)} is set to <code>true</code>. |
||
221 | * |
||
222 | * @param integer $publisherAckTimeout The number of milliseconds to wait for confirms, or 0 to wait indefinitely. |
||
223 | */ |
||
224 | 1 | public function setPublisherAckTimeout($publisherAckTimeout) |
|
228 | |||
229 | /* |
||
230 | * Sets the Message Converter that creates AMQP Messages from Event Messages and vice versa. Setting this property |
||
231 | * will ignore the "durable", "serializer" and "routingKeyResolver" properties, which just act as short hands to |
||
232 | * create a DefaultAMQPMessageConverter instance. |
||
233 | * <p/> |
||
234 | * Defaults to a DefaultAMQPMessageConverter. |
||
235 | * |
||
236 | * @param messageConverter The message converter to convert AMQP Messages to Event Messages and vice versa. |
||
237 | */ |
||
238 | // public void setMessageConverter(AMQPMessageConverter messageConverter) { |
||
239 | // this.messageConverter = messageConverter; |
||
240 | // } |
||
241 | |||
242 | /** |
||
243 | * Whether or not messages should be marked as "durable" when sending them out. Durable messages suffer from a |
||
244 | * performance penalty, but will survive a reboot of the Message broker that stores them. |
||
245 | * <p/> |
||
246 | * By default, messages are durable. |
||
247 | * <p/> |
||
248 | * Note that this setting is ignored if a {@link |
||
249 | * #setMessageConverter(org.axonframework.eventhandling.amqp.AMQPMessageConverter) MessageConverter} is provided. |
||
250 | * In that case, the message converter must add the properties to reflect the required durability setting. |
||
251 | * |
||
252 | * @param boolean $durable whether or not messages should be durable |
||
253 | */ |
||
254 | public function setDurable($durable) |
||
258 | |||
259 | /** |
||
260 | * Sets the name of the exchange to dispatch published messages to. Defaults to "{@value #DEFAULT_EXCHANGE_NAME}". |
||
261 | * |
||
262 | * @param string $exchangeName the name of the exchange to dispatch messages to |
||
263 | */ |
||
264 | 8 | public function setExchangeName($exchangeName) |
|
268 | |||
269 | 6 | public function publish(array $events) |
|
318 | |||
319 | /** |
||
320 | * @param LoggerInterface $logger |
||
321 | * @return null |
||
322 | */ |
||
323 | 8 | public function setLogger(LoggerInterface $logger) |
|
327 | |||
328 | /** |
||
329 | * Sets the RoutingKeyResolver that provides the Routing Key for each message to dispatch. |
||
330 | * |
||
331 | * @param RoutingKeyResolverInterface $routingKeyResolver the RoutingKeyResolver to use |
||
332 | */ |
||
333 | public function setRoutingKeyResolver(RoutingKeyResolverInterface $routingKeyResolver) |
||
337 | |||
338 | } |
||
339 |