AbstractMessageProcessor   A
last analyzed

Complexity

Total Complexity 9

Size/Duplication

Total Lines 98
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 42
dl 0
loc 98
rs 10
c 0
b 0
f 0
wmc 9

4 Methods

Rating   Name   Duplication   Size   Complexity  
A getProcessedMessages() 0 3 1
A ack() 0 14 2
A consume() 0 27 4
A nack() 0 8 2
1
<?php
2
/**
3
 * Author: Joker
4
 * Date: 2020-05-08 13:57
5
 */
6
7
namespace JokerProject\LaravelAliyunAmqp\Processor;
8
9
use PhpAmqpLib\Message\AMQPMessage;
10
use Psr\Log\LoggerAwareInterface;
11
use Psr\Log\LoggerAwareTrait;
12
13
/**
14
 * Class AbstractMessageProcessor
15
 *
16
 * @package JokerProject\LaravelAliyunAmqp\Processor
17
 */
18
abstract class AbstractMessageProcessor implements MessageProcessorInterface, LoggerAwareInterface
19
{
20
    use LoggerAwareTrait;
21
22
    /**
23
     * @const string Key used on message to identify if we ack/nack via the child
24
     */
25
    const HANDLED_PROPERTY = 'handled_property';
26
27
    /**
28
     * @var int
29
     */
30
    private $messageCount = 0;
31
32
    /**
33
     * {@inheritdoc}
34
     * @param AMQPMessage $message
35
     */
36
    public function consume(AMQPMessage $message)
37
    {
38
        $this->messageCount++;
39
        try {
40
            $response = $this->processMessage($message);
41
            // Already ack/nack from inside the processor using
42
            // the protected methods ::ack / ::nack
43
            if (property_exists($message, self::HANDLED_PROPERTY)) {
44
                $this->logger->debug("Already handled!");
45
                return;
46
            }
47
            if ($response === true) {
48
                $this->ack($message);
49
            } else {
50
                $this->nack($message);
51
            }
52
        } catch (\Throwable $e) {
53
            $this->logger->error(
54
                sprintf(
55
                    "Could not process message, got %s from %s in %d for message: %s",
56
                    get_class($e) . '-' . $e->getMessage(),
57
                    (string)$e->getFile(),
58
                    (int)$e->getLine(),
59
                    (string)$message->getBody()
60
                )
61
            );
62
            $this->nack($message);
63
        }
64
    }
65
66
    /**
67
     * @param AMQPMessage $message
68
     */
69
    protected function ack(AMQPMessage $message)
70
    {
71
        try {
72
            $this->logger->debug(sprintf("Processed with success message %s", $message->getBody()));
73
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
74
            $message->{self::HANDLED_PROPERTY} = true;
75
        } catch (\Throwable $e) {
76
            $this->logger->error(
77
                sprintf(
78
                    "Could not process message, got %s from %s in %d for message: %s",
79
                    get_class($e) . '-' . $e->getMessage(),
80
                    (string)$e->getFile(),
81
                    (int)$e->getLine(),
82
                    (string)$message->getBody()
83
                )
84
            );
85
        }
86
    }
87
88
    /**
89
     * @param AMQPMessage $message
90
     * @param bool $redeliver
91
     */
92
    protected function nack(AMQPMessage $message, bool $redeliver = true)
93
    {
94
        try {
95
            $this->logger->debug(sprintf("Did not processed with success message %s", $message->getBody()));
96
            $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, $redeliver);
97
            $message->{self::HANDLED_PROPERTY} = true;
98
        } catch (\Throwable $e) {
99
            $this->logger->debug(sprintf("Did not processed with success message %s", $message->getBody()));
100
        }
101
    }
102
103
    /**
104
     * @return int
105
     */
106
    public function getProcessedMessages(): int
107
    {
108
        return $this->messageCount;
109
    }
110
111
    /**
112
     * @param AMQPMessage $message
113
     * @return bool
114
     */
115
    abstract public function processMessage(AMQPMessage $message): bool;
116
}
117