Completed
Push — master ( 3c0f5e...015fa5 )
by Thibaud
03:56
created

AmqpMessageQueue::setLogger()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Alchemy\Queue\Amqp;
4
5
use Alchemy\Queue\Message;
6
use Alchemy\Queue\MessageHandlerResolver;
7
use Alchemy\Queue\MessageHandlingException;
8
use Alchemy\Queue\MessagePublishingException;
9
use Alchemy\Queue\MessageQueue;
10
use Psr\Log\LoggerAwareInterface;
11
use Psr\Log\LoggerInterface;
12
use Psr\Log\NullLogger;
13
use Ramsey\Uuid\Uuid;
14
15
class AmqpMessageQueue implements MessageQueue, LoggerAwareInterface
16
{
17
    /**
18
     * @var \AMQPExchange
19
     */
20
    private $exchange;
21
22
    /**
23
     * @var \AMQPQueue
24
     */
25
    private $queue;
26
27
    /**
28
     * @var LoggerInterface
29
     */
30
    private $logger;
31
32 20
    public function __construct(\AMQPExchange $exchange, \AMQPQueue $queue)
33
    {
34 20
        $this->exchange = $exchange;
35 20
        $this->queue = $queue;
36 20
        $this->logger = new NullLogger();
37 20
    }
38
39
    /**
40
     * Sets a logger instance on the object
41
     *
42
     * @param LoggerInterface $logger
43
     * @return null
44
     */
45 4
    public function setLogger(LoggerInterface $logger)
46
    {
47 4
        $this->logger = $logger;
48 4
    }
49
50
    /**
51
     * @param Message $message
52
     */
53 12
    public function publish(Message $message)
54
    {
55 12
        $this->logger->debug('Publishing message to queue: ' . $this->queue->getName());
56
57 12
        $attributes = ['correlation_id' => $message->getCorrelationId()];
58 12
        $result = $this->exchange->publish($message->getBody(), $this->queue->getName(), AMQP_DURABLE, $attributes);
59
60 12
        if (! $result) {
61 4
            throw new MessagePublishingException();
62
        }
63 8
    }
64
65
    /**
66
     * @param MessageHandlerResolver $resolver
67
     */
68 12
    public function handle(MessageHandlerResolver $resolver)
69
    {
70 12
        $this->logger->debug('Consuming messages from AMQP queue: ' . $this->queue->getName());
71
72 12
        $this->queue->consume(function (\AMQPEnvelope $envelope) use ($resolver) {
73 4
            $message = new Message($envelope->getBody(), $envelope->getCorrelationId());
74 4
            $handler = $resolver->resolveHandler($message);
75
76
            try {
77 4
                $this->logger->debug('Dispatching message to handler');
78 4
                $handler->handle($message);
79 4
                $this->ackMessage($envelope);
80 3
            } catch (MessageHandlingException $exception) {
81
                $this->logger->error('Caught exception while handling message: ' . $exception->getMessage());
82
                $this->nackMessage($envelope);
83
            }
84
85 4
            return false;
86 12
        }, AMQP_NOPARAM, Uuid::uuid4());
87 4
    }
88
89 4 View Code Duplication
    private function ackMessage(\AMQPEnvelope $envelope)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
90
    {
91 4
        $this->logger->debug('ACK message', [
92 4
            'correlation_id' => $envelope->getCorrelationId(),
93 4
            'body' => $envelope->getBody()
94 3
        ]);
95
96 4
        $this->queue->ack($envelope->getDeliveryTag(), AMQP_NOPARAM);
97 4
    }
98
99 View Code Duplication
    private function nackMessage(\AMQPEnvelope $envelope)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
100
    {
101
        $this->logger->debug('NACK message', [
102
            'correlation_id' => $envelope->getCorrelationId(),
103
            'body' => $envelope->getBody()
104
        ]);
105
106
        $this->queue->nack($envelope->getDeliveryTag(), AMQP_NOPARAM);
107
    }
108
}
109