AmqpTerminal::publish()   F
last analyzed

Complexity

Conditions 11
Paths 409

Size

Total Lines 49
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 31
CRAP Score 12.044

Importance

Changes 8
Bugs 3 Features 0
Metric Value
c 8
b 3
f 0
dl 0
loc 49
ccs 31
cts 39
cp 0.7949
rs 3.7726
cc 11
eloc 30
nc 409
nop 1
crap 12.044

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/*
4
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
5
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
6
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
7
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
8
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
9
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
10
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
11
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
12
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
13
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
14
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
15
 *
16
 * The software is based on the Axon Framework project which is
17
 * licensed under the Apache 2.0 license. For more information on the Axon Framework
18
 * see <http://www.axonframework.org/>.
19
 * 
20
 * This software consists of voluntary contributions made by many individuals
21
 * and is licensed under the MIT license. For more information, see
22
 * <http://www.governor-framework.org/>.
23
 */
24
25
namespace Governor\Framework\EventHandling\Amqp;
26
27
use Governor\Framework\Common\Logging\NullLogger;
28
use Governor\Framework\EventHandling\TerminalInterface;
29
use PhpAmqpLib\Connection\AMQPConnection;
30
use PhpAmqpLib\Channel\AMQPChannel;
31
use PhpAmqpLib\Message\AMQPMessage as RawMessage;
32
use Psr\Log\LoggerAwareInterface;
33
use Psr\Log\LoggerInterface;
34
use Governor\Framework\Serializer\SerializerInterface;
35
use Governor\Framework\UnitOfWork\CurrentUnitOfWork;
36
37
/**
38
 * Implementation of the {@see TerminalInterface} supporting the AMQP protocol.
39
 *
40
 * @author    "David Kalosi" <[email protected]>
41
 * @license   <a href="http://www.opensource.org/licenses/mit-license.php">MIT License</a>
42
 */
43
class AmqpTerminal implements TerminalInterface,  LoggerAwareInterface
0 ignored issues
show
Coding Style introduced by
Expected 1 space before "LoggerAwareInterface"; 2 found
Loading history...
44
{
45
46
    const DEFAULT_EXCHANGE_NAME = "Governor.EventBus";
47
48
    /**
49
     * @var LoggerInterface
50
     */
51
    private $logger;
52
53
    /**
54
     * @var AMQPConnection
55
     */
56
    private $connection;
57
58
    /**
59
     * @var string
60
     */
61
    private $exchangeName = self::DEFAULT_EXCHANGE_NAME;
62
63
    /**
64
     * @var boolean
65
     */
66
    private $isTransactional = false;
67
68
    /**
69
     * @var boolean
70
     */
71
    private $isDurable = true;
72
    //  private ListenerContainerLifecycleManager listenerContainerLifecycleManager;
73
74
    /**
75
     * @var AMQPMessageConverterInterface
76
     */
77
    private $messageConverter;
78
79
    /**
80
     * @var SerializerInterface
81
     */
82
    private $serializer;
83
84
    /**
85
     * @var RoutingKeyResolverInterface
86
     */
87
    private $routingKeyResolver;
88
89
    /**
90
     * @var boolean
91
     */
92
    private $waitForAck;
93
94
    /**
95
     * @var integer
96
     */
97
    private $publisherAckTimeout = 0;
98
99 8
    public function __construct(
100
        SerializerInterface $serializer,
101
        AmqpMessageConverterInterface $messageConverter = null
102
    ) {
103 8
        $this->serializer = $serializer;
104 8
        $this->logger = new NullLogger();
105 8
        $this->routingKeyResolver = new NamespaceRoutingKeyResolver();
106 8
        $this->messageConverter = null === $messageConverter ? new DefaultAmqpMessageConverter(
107 8
            $this->serializer,
108 8
            $this->routingKeyResolver, $this->isDurable
109 8
        ) : $messageConverter;
110 8
    }
111
112 5
    private function tryClose(AMQPChannel $channel)
113
    {
114
        try {
115 5
            $channel->close();
116 5
        } catch (\Exception $ex) {
117
            $this->logger->info("Unable to close channel. It might already be closed.");
118
        }
119 5
    }
120
121
    /**
122
     * Does the actual publishing of the given <code>body</code> on the given <code>channel</code>. This method can be
123
     * overridden to change the properties used to send a message.
124
     *
125
     * @param AMQPChannel $channel The channel to dispatch the message on
126
     * @param AmqpMessage $amqpMessage The AMQPMessage describing the characteristics of the message to publish
127
     */
128 6
    protected function doSendMessage(
129
        AMQPChannel $channel,
130
        AmqpMessage $amqpMessage
131
    ) {
132 6
        $rawMessage = new RawMessage(
133 6
            $amqpMessage->getBody(),
134 6
            $amqpMessage->getProperties()
135 6
        );
136
137 6
        $this->logger->debug(
138 6
            "Publishing message to {exchange} with routing key {key}",
139
            array(
140 6
                'exchange' => $this->exchangeName,
141 6
                'key' => $amqpMessage->getRoutingKey()
142 6
            )
143 6
        );
144
145 6
        $channel->basic_publish(
146 6
            $rawMessage,
147 6
            $this->exchangeName,
148 6
            $amqpMessage->getRoutingKey(),
149 6
            $amqpMessage->isMandatory(),
150 6
            $amqpMessage->isImmediate()
151 6
        );
152 6
    }
153
154
    private function tryRollback(AMQPChannel $channel)
155
    {
156
        try {
157
            $channel->tx_rollback();
158
        } catch (\Exception $ex) {
159
            $this->logger->debug("Unable to rollback. The underlying channel might already be closed.");
160
        }
161
    }
162
163
    /**
164
     * Sets the Connection providing the Channels to send messages on.
165
     * <p/>
166
     *
167
     * @param AMQPConnection $connection The connection to set
168
     */
169 8
    public function setConnection(AMQPConnection $connection)
170
    {
171 8
        $this->connection = $connection;
172 8
    }
173
174
    /**
175
     * Whether this Terminal should dispatch its Events in a transaction or not. Defaults to <code>false</code>.
176
     * <p/>
177
     * If a delegate Terminal  is configured, the transaction will be committed <em>after</em> the delegate has
178
     * dispatched the events.
179
     * <p/>
180
     * Transactional behavior cannot be enabled if {@link #setWaitForPublisherAck(boolean)} has been set to
181
     * <code>true</code>.
182
     *
183
     * @param boolean $transactional whether dispatching should be transactional or not
184
     */
185 8
    public function setTransactional($transactional)
186
    {
187 8
        if (!$this->waitForAck || !$transactional) {
188 8
            $this->isTransactional = $transactional;
189 8
        } else {
190 1
            throw new \LogicException("Cannot set transactional behavior when 'waitForServerAck' is enabled.");
191
        }
192 8
    }
193
194
    /**
195
     * Enables or diables the RabbitMQ specific publisher acknowledgements (confirms). When confirms are enabled, the
196
     * terminal will wait until the server has acknowledged the reception (or fsync to disk on persistent messages) of
197
     * all published messages.
198
     * <p/>
199
     * Server ACKS cannot be enabled when transactions are enabled.
200
     * <p/>
201
     * See <a href="http://www.rabbitmq.com/confirms.html">RabbitMQ Documentation</a> for more information about
202
     * publisher acknowledgements.
203
     *
204
     * @param boolean $waitForPublisherAck whether or not to enab;e server acknowledgements (confirms)
205
     */
206 4
    public function setWaitForPublisherAck($waitForPublisherAck)
207
    {
208 4
        if (!$waitForPublisherAck || !$this->isTransactional) {
209 3
            $this->waitForAck = $waitForPublisherAck;
210 3
        } else {
211 1
            throw new \LogicException("Cannot set 'waitForPublisherAck' when using transactions.");
212
        }
213 3
    }
214
215
    /**
216
     * Sets the maximum amount of time (in milliseconds) the publisher may wait for the acknowledgement of published
217
     * messages. If not all messages have been acknowledged withing this time, the publication will throw an
218
     * EventPublicationFailedException.
219
     * <p/>
220
     * This setting is only used when {@link #setWaitForPublisherAck(boolean)} is set to <code>true</code>.
221
     *
222
     * @param integer $publisherAckTimeout The number of milliseconds to wait for confirms, or 0 to wait indefinitely.
223
     */
224 1
    public function setPublisherAckTimeout($publisherAckTimeout)
225
    {
226 1
        $this->publisherAckTimeout = $publisherAckTimeout;
227 1
    }
228
229
    /*
230
     * Sets the Message Converter that creates AMQP Messages from Event Messages and vice versa. Setting this property
231
     * will ignore the "durable", "serializer" and "routingKeyResolver" properties, which just act as short hands to
232
     * create a DefaultAMQPMessageConverter instance.
233
     * <p/>
234
     * Defaults to a DefaultAMQPMessageConverter.
235
     *
236
     * @param messageConverter The message converter to convert AMQP Messages to Event Messages and vice versa.
237
     */
238
    //  public void setMessageConverter(AMQPMessageConverter messageConverter) {
239
    //      this.messageConverter = messageConverter;
240
    //  }
241
242
    /**
243
     * Whether or not messages should be marked as "durable" when sending them out. Durable messages suffer from a
244
     * performance penalty, but will survive a reboot of the Message broker that stores them.
245
     * <p/>
246
     * By default, messages are durable.
247
     * <p/>
248
     * Note that this setting is ignored if a {@link
249
     * #setMessageConverter(org.axonframework.eventhandling.amqp.AMQPMessageConverter) MessageConverter} is provided.
250
     * In that case, the message converter must add the properties to reflect the required durability setting.
251
     *
252
     * @param boolean $durable whether or not messages should be durable
253
     */
254
    public function setDurable($durable)
255
    {
256
        $this->isDurable = $durable;
257
    }
258
259
    /**
260
     * Sets the name of the exchange to dispatch published messages to. Defaults to "{@value #DEFAULT_EXCHANGE_NAME}".
261
     *
262
     * @param string $exchangeName the name of the exchange to dispatch messages to
263
     */
264 8
    public function setExchangeName($exchangeName)
265
    {
266 8
        $this->exchangeName = $exchangeName;
267 8
    }
268
269 6
    public function publish(array $events)
270
    {
271 6
        if (null === $this->connection) {
272
            throw new \RuntimeException("The AMQPTerminal has no connection configured.");
273
        }
274
275 6
        $channel = $this->connection->channel();
276
277 6
        if ($this->isTransactional) {
278 4
            $channel->tx_select();
279 4
        }
280
281
        try {
282 6
            if ($this->waitForAck) {
283 2
                $channel->confirm_select();
284 2
            }
285
286 6
            foreach ($events as $event) {
287 6
                $amqpMessage = $this->messageConverter->createAmqpMessage($event);
288 6
                $this->doSendMessage($channel, $amqpMessage);
289 6
            }
290
291 6
            if (CurrentUnitOfWork::isStarted()) {
292 4
                CurrentUnitOfWork::get()->registerListener(
293 4
                    new ChannelTransactionUnitOfWorkListener(
294 4
                        $this->logger,
295 4
                        $channel, $this
296 4
                    )
297 4
                );
298 6
            } elseif ($this->isTransactional) {
299 1
                $channel->tx_commit();
300 2
            } elseif ($this->waitForAck) {
301 1
                $channel->wait_for_pending_acks($this->publisherAckTimeout);
302 1
            }
303 6
        } catch (\Exception $ex) {
304
            if ($this->isTransactional) {
305
                $this->tryRollback($channel);
306
            }
307
308
            throw new EventPublicationFailedException(
309
                "Failed to dispatch Events to the Message Broker.",
310
                0, $ex
311
            );
312 6
        } finally {
313 6
            if (!CurrentUnitOfWork::isStarted()) {
314 2
                $this->tryClose($channel);
315 2
            }
316
        }
317 6
    }
318
319
    /**
320
     * @param LoggerInterface $logger
321
     * @return null
322
     */
323 8
    public function setLogger(LoggerInterface $logger)
324
    {
325 8
        $this->logger = $logger;
326 8
    }
327
328
    /**
329
     * Sets the RoutingKeyResolver that provides the Routing Key for each message to dispatch.
330
     *
331
     * @param RoutingKeyResolverInterface $routingKeyResolver the RoutingKeyResolver to use
332
     */
333
    public function setRoutingKeyResolver(RoutingKeyResolverInterface $routingKeyResolver)
334
    {
335
        $this->routingKeyResolver = $routingKeyResolver;
336
    }
337
338
}
339