Completed
Push — master ( f58be8...855521 )
by Tilita
03:07
created

AbstractMessageProcessor::nack()   A

Complexity

Conditions 2
Paths 3

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2.0932

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 5
cts 7
cp 0.7143
rs 9.9332
c 0
b 0
f 0
cc 2
nc 3
nop 2
crap 2.0932
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Processor;
3
4
use PhpAmqpLib\Message\AMQPMessage;
5
use Psr\Log\LoggerAwareInterface;
6
use Psr\Log\LoggerAwareTrait;
7
8
/**
9
 * Class AbstractMessageProcessor
10
 *
11
 * @package NeedleProject\LaravelRabbitMq\Processor
12
 * @author  Adrian tilita <[email protected]>
13
 */
14
abstract class AbstractMessageProcessor implements MessageProcessorInterface, LoggerAwareInterface
15
{
16
    use LoggerAwareTrait;
17
18
    /**
19
     * @const string Key used on message to identify if we ack/nack via the child
20
     */
21
    const HANDLED_PROPERTY = 'handled_property';
22
23
    /**
24
     * @var int
25
     */
26
    private $messageCount = 0;
27
28
    /**
29
     * {@inheritdoc}
30
     * @param AMQPMessage $message
31
     */
32 4
    public function consume(AMQPMessage $message)
33
    {
34 4
        $this->messageCount++;
35
        try {
36 4
            $response = $this->processMessage($message);
37
            // Already ack/nack from inside the processor using
38
            // the protected methods ::ack / ::nack
39 3
            if (property_exists($message, self::HANDLED_PROPERTY)) {
40 1
                $this->logger->debug("Already handled!");
41 1
                return;
42
            }
43 3
            if ($response === true) {
44 2
                $this->ack($message);
45
            } else {
46 3
                $this->nack($message);
47
            }
48 1
        } catch (\Throwable $e) {
49 1
            $this->logger->error(
50
                sprintf(
51 1
                    "Could not process message, got %s from %s in %d for message: %s",
52 1
                    get_class($e) . '-' . $e->getMessage(),
53 1
                    (string)$e->getFile(),
54 1
                    (int)$e->getLine(),
55 1
                    (string)$message->getBody()
56
                )
57
            );
58 1
            $this->nack($message);
59
        }
60 4
    }
61
62
    /**
63
     * @param AMQPMessage $message
64
     */
65 2
    protected function ack(AMQPMessage $message)
66
    {
67
        try {
68 2
            $this->logger->debug(sprintf("Processed with success message %s", $message->getBody()));
69 2
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
70 2
            $message->{self::HANDLED_PROPERTY} = true;
71
        } catch (\Throwable $e) {
72
            $this->logger->error(
73
                sprintf(
74
                    "Could not process message, got %s from %s in %d for message: %s",
75
                    get_class($e) . '-' . $e->getMessage(),
76
                    (string)$e->getFile(),
77
                    (int)$e->getLine(),
78
                    (string)$message->getBody()
79
                )
80
            );
81
        }
82 2
    }
83
84
    /**
85
     * @param AMQPMessage $message
86
     * @param bool $redeliver
87
     */
88 2
    protected function nack(AMQPMessage $message, bool $redeliver = true)
89
    {
90
        try {
91 2
            $this->logger->debug(sprintf("Did not processed with success message %s", $message->getBody()));
92 2
            $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, $redeliver);
93 2
            $message->{self::HANDLED_PROPERTY} = true;
94
        } catch (\Throwable $e) {
95
            $this->logger->debug(sprintf("Did not processed with success message %s", $message->getBody()));
96
        }
97 2
    }
98
99
    /**
100
     * @return int
101
     */
102 1
    public function getProcessedMessages(): int
103
    {
104 1
        return $this->messageCount;
105
    }
106
107
    /**
108
     * @param AMQPMessage $message
109
     * @return bool
110
     */
111
    abstract public function processMessage(AMQPMessage $message): bool;
0 ignored issues
show
Coding Style introduced by
function processMessage() does not seem to conform to the naming convention (^(?:is|has|should|may|supports)).

This check examines a number of code elements and verifies that they conform to the given naming conventions.

You can set conventions for local variables, abstract classes, utility classes, constant, properties, methods, parameters, interfaces, classes, exceptions and special methods.

Loading history...
112
}
113