Completed
Push — master ( f58be8...855521 )
by Tilita
03:07
created

AbstractMessageProcessor   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 99
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 3

Test Coverage

Coverage 76.92%

Importance

Changes 0
Metric Value
wmc 9
lcom 1
cbo 3
dl 0
loc 99
ccs 30
cts 39
cp 0.7692
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
A consume() 0 29 4
A ack() 0 18 2
A nack() 0 10 2
A getProcessedMessages() 0 4 1
processMessage() 0 1 ?
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Processor;
3
4
use PhpAmqpLib\Message\AMQPMessage;
5
use Psr\Log\LoggerAwareInterface;
6
use Psr\Log\LoggerAwareTrait;
7
8
/**
9
 * Class AbstractMessageProcessor
10
 *
11
 * @package NeedleProject\LaravelRabbitMq\Processor
12
 * @author  Adrian tilita <[email protected]>
13
 */
14
abstract class AbstractMessageProcessor implements MessageProcessorInterface, LoggerAwareInterface
15
{
16
    use LoggerAwareTrait;
17
18
    /**
19
     * @const string Key used on message to identify if we ack/nack via the child
20
     */
21
    const HANDLED_PROPERTY = 'handled_property';
22
23
    /**
24
     * @var int
25
     */
26
    private $messageCount = 0;
27
28
    /**
29
     * {@inheritdoc}
30
     * @param AMQPMessage $message
31
     */
32 4
    public function consume(AMQPMessage $message)
33
    {
34 4
        $this->messageCount++;
35
        try {
36 4
            $response = $this->processMessage($message);
37
            // Already ack/nack from inside the processor using
38
            // the protected methods ::ack / ::nack
39 3
            if (property_exists($message, self::HANDLED_PROPERTY)) {
40 1
                $this->logger->debug("Already handled!");
41 1
                return;
42
            }
43 3
            if ($response === true) {
44 2
                $this->ack($message);
45
            } else {
46 3
                $this->nack($message);
47
            }
48 1
        } catch (\Throwable $e) {
49 1
            $this->logger->error(
50
                sprintf(
51 1
                    "Could not process message, got %s from %s in %d for message: %s",
52 1
                    get_class($e) . '-' . $e->getMessage(),
53 1
                    (string)$e->getFile(),
54 1
                    (int)$e->getLine(),
55 1
                    (string)$message->getBody()
56
                )
57
            );
58 1
            $this->nack($message);
59
        }
60 4
    }
61
62
    /**
63
     * @param AMQPMessage $message
64
     */
65 2
    protected function ack(AMQPMessage $message)
66
    {
67
        try {
68 2
            $this->logger->debug(sprintf("Processed with success message %s", $message->getBody()));
69 2
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
70 2
            $message->{self::HANDLED_PROPERTY} = true;
71
        } catch (\Throwable $e) {
72
            $this->logger->error(
73
                sprintf(
74
                    "Could not process message, got %s from %s in %d for message: %s",
75
                    get_class($e) . '-' . $e->getMessage(),
76
                    (string)$e->getFile(),
77
                    (int)$e->getLine(),
78
                    (string)$message->getBody()
79
                )
80
            );
81
        }
82 2
    }
83
84
    /**
85
     * @param AMQPMessage $message
86
     * @param bool $redeliver
87
     */
88 2
    protected function nack(AMQPMessage $message, bool $redeliver = true)
89
    {
90
        try {
91 2
            $this->logger->debug(sprintf("Did not processed with success message %s", $message->getBody()));
92 2
            $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, $redeliver);
93 2
            $message->{self::HANDLED_PROPERTY} = true;
94
        } catch (\Throwable $e) {
95
            $this->logger->debug(sprintf("Did not processed with success message %s", $message->getBody()));
96
        }
97 2
    }
98
99
    /**
100
     * @return int
101
     */
102 1
    public function getProcessedMessages(): int
103
    {
104 1
        return $this->messageCount;
105
    }
106
107
    /**
108
     * @param AMQPMessage $message
109
     * @return bool
110
     */
111
    abstract public function processMessage(AMQPMessage $message): bool;
0 ignored issues
show
Coding Style introduced by
function processMessage() does not seem to conform to the naming convention (^(?:is|has|should|may|supports)).

This check examines a number of code elements and verifies that they conform to the given naming conventions.

You can set conventions for local variables, abstract classes, utility classes, constant, properties, methods, parameters, interfaces, classes, exceptions and special methods.

Loading history...
112
}
113