1
|
|
|
<?php |
2
|
|
|
namespace NeedleProject\LaravelRabbitMq\Processor; |
3
|
|
|
|
4
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
5
|
|
|
use Psr\Log\LoggerAwareInterface; |
6
|
|
|
use Psr\Log\LoggerAwareTrait; |
7
|
|
|
|
8
|
|
|
/** |
9
|
|
|
* Class AbstractMessageProcessor |
10
|
|
|
* |
11
|
|
|
* @package NeedleProject\LaravelRabbitMq\Processor |
12
|
|
|
* @author Adrian tilita <[email protected]> |
13
|
|
|
*/ |
14
|
|
|
abstract class AbstractMessageProcessor implements MessageProcessorInterface, LoggerAwareInterface |
15
|
|
|
{ |
16
|
|
|
use LoggerAwareTrait; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* @const string Key used on message to identify if we ack/nack via the child |
20
|
|
|
*/ |
21
|
|
|
const HANDLED_PROPERTY = 'handled_property'; |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* @var int |
25
|
|
|
*/ |
26
|
|
|
private $messageCount = 0; |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* {@inheritdoc} |
30
|
|
|
* @param AMQPMessage $message |
31
|
|
|
*/ |
32
|
4 |
|
public function consume(AMQPMessage $message) |
33
|
|
|
{ |
34
|
4 |
|
$this->messageCount++; |
35
|
|
|
try { |
36
|
4 |
|
$response = $this->processMessage($message); |
37
|
|
|
// Already ack/nack from inside the processor using |
38
|
|
|
// the protected methods ::ack / ::nack |
39
|
3 |
|
if (property_exists($message, self::HANDLED_PROPERTY)) { |
40
|
1 |
|
$this->logger->debug("Already handled!"); |
41
|
1 |
|
return; |
42
|
|
|
} |
43
|
3 |
|
if ($response === true) { |
44
|
2 |
|
$this->ack($message); |
45
|
|
|
} else { |
46
|
3 |
|
$this->nack($message); |
47
|
|
|
} |
48
|
1 |
|
} catch (\Throwable $e) { |
49
|
1 |
|
$this->logger->error( |
50
|
|
|
sprintf( |
51
|
1 |
|
"Could not process message, got %s from %s in %d for message: %s", |
52
|
1 |
|
get_class($e) . '-' . $e->getMessage(), |
53
|
1 |
|
(string)$e->getFile(), |
54
|
1 |
|
(int)$e->getLine(), |
55
|
1 |
|
(string)$message->getBody() |
56
|
|
|
) |
57
|
|
|
); |
58
|
1 |
|
$this->nack($message); |
59
|
|
|
} |
60
|
4 |
|
} |
61
|
|
|
|
62
|
|
|
/** |
63
|
|
|
* @param AMQPMessage $message |
64
|
|
|
*/ |
65
|
2 |
|
protected function ack(AMQPMessage $message) |
66
|
|
|
{ |
67
|
|
|
try { |
68
|
2 |
|
$this->logger->debug(sprintf("Processed with success message %s", $message->getBody())); |
69
|
2 |
|
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); |
70
|
2 |
|
$message->{self::HANDLED_PROPERTY} = true; |
71
|
|
|
} catch (\Throwable $e) { |
72
|
|
|
$this->logger->error( |
73
|
|
|
sprintf( |
74
|
|
|
"Could not process message, got %s from %s in %d for message: %s", |
75
|
|
|
get_class($e) . '-' . $e->getMessage(), |
76
|
|
|
(string)$e->getFile(), |
77
|
|
|
(int)$e->getLine(), |
78
|
|
|
(string)$message->getBody() |
79
|
|
|
) |
80
|
|
|
); |
81
|
|
|
} |
82
|
2 |
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* @param AMQPMessage $message |
86
|
|
|
* @param bool $redeliver |
87
|
|
|
*/ |
88
|
2 |
|
protected function nack(AMQPMessage $message, bool $redeliver = true) |
89
|
|
|
{ |
90
|
|
|
try { |
91
|
2 |
|
$this->logger->debug(sprintf("Did not processed with success message %s", $message->getBody())); |
92
|
2 |
|
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, $redeliver); |
93
|
2 |
|
$message->{self::HANDLED_PROPERTY} = true; |
94
|
|
|
} catch (\Throwable $e) { |
95
|
|
|
$this->logger->debug(sprintf("Did not processed with success message %s", $message->getBody())); |
96
|
|
|
} |
97
|
2 |
|
} |
98
|
|
|
|
99
|
|
|
/** |
100
|
|
|
* @return int |
101
|
|
|
*/ |
102
|
1 |
|
public function getProcessedMessages(): int |
103
|
|
|
{ |
104
|
1 |
|
return $this->messageCount; |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
/** |
108
|
|
|
* @param AMQPMessage $message |
109
|
|
|
* @return bool |
110
|
|
|
*/ |
111
|
|
|
abstract public function processMessage(AMQPMessage $message): bool; |
|
|
|
|
112
|
|
|
} |
113
|
|
|
|
This check examines a number of code elements and verifies that they conform to the given naming conventions.
You can set conventions for local variables, abstract classes, utility classes, constant, properties, methods, parameters, interfaces, classes, exceptions and special methods.