1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Alchemy\Queue\Amqp; |
4
|
|
|
|
5
|
|
|
use Alchemy\Queue\Message; |
6
|
|
|
use Alchemy\Queue\MessageHandlerResolver; |
7
|
|
|
use Alchemy\Queue\MessageHandlingException; |
8
|
|
|
use Alchemy\Queue\MessagePublishingException; |
9
|
|
|
use Alchemy\Queue\MessageQueue; |
10
|
|
|
use Psr\Log\LoggerAwareInterface; |
11
|
|
|
use Psr\Log\LoggerInterface; |
12
|
|
|
use Psr\Log\NullLogger; |
13
|
|
|
use Ramsey\Uuid\Uuid; |
14
|
|
|
|
15
|
|
|
class AmqpMessageQueue implements MessageQueue, LoggerAwareInterface |
16
|
|
|
{ |
17
|
|
|
/** |
18
|
|
|
* @var \AMQPExchange |
19
|
|
|
*/ |
20
|
|
|
private $exchange; |
21
|
|
|
|
22
|
|
|
/** |
23
|
|
|
* @var \AMQPQueue |
24
|
|
|
*/ |
25
|
|
|
private $queue; |
26
|
|
|
|
27
|
|
|
/** |
28
|
|
|
* @var LoggerInterface |
29
|
|
|
*/ |
30
|
|
|
private $logger; |
31
|
|
|
|
32
|
20 |
|
public function __construct(\AMQPExchange $exchange, \AMQPQueue $queue) |
33
|
|
|
{ |
34
|
20 |
|
$this->exchange = $exchange; |
35
|
20 |
|
$this->queue = $queue; |
36
|
20 |
|
$this->logger = new NullLogger(); |
37
|
20 |
|
} |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* Sets a logger instance on the object |
41
|
|
|
* |
42
|
|
|
* @param LoggerInterface $logger |
43
|
|
|
* @return null |
44
|
|
|
*/ |
45
|
4 |
|
public function setLogger(LoggerInterface $logger) |
46
|
|
|
{ |
47
|
4 |
|
$this->logger = $logger; |
48
|
4 |
|
} |
49
|
|
|
|
50
|
|
|
/** |
51
|
|
|
* @param Message $message |
52
|
|
|
*/ |
53
|
12 |
|
public function publish(Message $message) |
54
|
|
|
{ |
55
|
12 |
|
$this->logger->debug('Publishing message to queue: ' . $this->queue->getName()); |
56
|
|
|
|
57
|
12 |
|
$attributes = ['correlation_id' => $message->getCorrelationId()]; |
58
|
12 |
|
$result = $this->exchange->publish($message->getBody(), $this->queue->getName(), AMQP_DURABLE, $attributes); |
59
|
|
|
|
60
|
12 |
|
if (! $result) { |
61
|
4 |
|
throw new MessagePublishingException(); |
62
|
|
|
} |
63
|
8 |
|
} |
64
|
|
|
|
65
|
|
|
/** |
66
|
|
|
* @param MessageHandlerResolver $resolver |
67
|
|
|
*/ |
68
|
12 |
|
public function handle(MessageHandlerResolver $resolver) |
69
|
|
|
{ |
70
|
12 |
|
$this->logger->debug('Consuming messages from AMQP queue: ' . $this->queue->getName()); |
71
|
|
|
|
72
|
12 |
|
$this->queue->consume(function (\AMQPEnvelope $envelope) use ($resolver) { |
73
|
4 |
|
$message = new Message($envelope->getBody(), $envelope->getCorrelationId()); |
74
|
4 |
|
$handler = $resolver->resolveHandler($message); |
75
|
|
|
|
76
|
|
|
try { |
77
|
4 |
|
$this->logger->debug('Dispatching message to handler'); |
78
|
4 |
|
$handler->handle($message); |
79
|
4 |
|
$this->ackMessage($envelope); |
80
|
3 |
|
} catch (MessageHandlingException $exception) { |
81
|
|
|
$this->logger->error('Caught exception while handling message: ' . $exception->getMessage()); |
82
|
|
|
$this->nackMessage($envelope); |
83
|
|
|
} |
84
|
|
|
|
85
|
4 |
|
return false; |
86
|
12 |
|
}, AMQP_NOPARAM, Uuid::uuid4()); |
87
|
4 |
|
} |
88
|
|
|
|
89
|
4 |
View Code Duplication |
private function ackMessage(\AMQPEnvelope $envelope) |
|
|
|
|
90
|
|
|
{ |
91
|
4 |
|
$this->logger->debug('ACK message', [ |
92
|
4 |
|
'correlation_id' => $envelope->getCorrelationId(), |
93
|
4 |
|
'body' => $envelope->getBody() |
94
|
3 |
|
]); |
95
|
|
|
|
96
|
4 |
|
$this->queue->ack($envelope->getDeliveryTag(), AMQP_NOPARAM); |
97
|
4 |
|
} |
98
|
|
|
|
99
|
|
View Code Duplication |
private function nackMessage(\AMQPEnvelope $envelope) |
|
|
|
|
100
|
|
|
{ |
101
|
|
|
$this->logger->debug('NACK message', [ |
102
|
|
|
'correlation_id' => $envelope->getCorrelationId(), |
103
|
|
|
'body' => $envelope->getBody() |
104
|
|
|
]); |
105
|
|
|
|
106
|
|
|
$this->queue->nack($envelope->getDeliveryTag(), AMQP_NOPARAM); |
107
|
|
|
} |
108
|
|
|
} |
109
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.