Completed
Push — 1.1 ( 60d65e...be0fbd )
by David
06:58 queued 03:09
created

AbstractConsumer::callback()   B

Complexity

Conditions 5
Paths 9

Size

Total Lines 23
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 23
rs 8.5906
c 0
b 0
f 0
cc 5
eloc 16
nc 9
nop 1
1
<?php
2
3
namespace Mouf\AmqpClient;
4
5
use Mouf\Utils\Log\Psr\ErrorLogLogger;
6
use Psr\Log\LoggerInterface;
7
8
abstract class AbstractConsumer implements ConsumerInterface
9
{
10
    use ConsumerTrait;
11
12
    /**
13
     * @var LoggerInterface
14
     */
15
    private $logger;
16
17
    /**
18
     * AbstractConsumer constructor.
19
     * @param LoggerInterface|null $logger
20
     */
21
    public function __construct(LoggerInterface $logger = null)
22
    {
23
        $this->logger = $logger ?: new ErrorLogLogger();
24
    }
25
26
    public function callback($msg)
27
    {
28
        try {
29
            $this->onMessageReceived($msg);
30
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
31
        } catch (RetryableExceptionInterface $e) {
32
            $this->logger->error("Exception caught while consuming message.", [
33
                'exception' => $e
34
            ]);
35
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], true, true);
36
            if ($e instanceof FatalExceptionInterface) {
37
                throw $e;
38
            }
39
        } catch (\Exception $e) {
40
            $this->logger->error("Exception caught while consuming message.", [
41
                'exception' => $e
42
            ]);
43
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], true, false);
44
            if ($e instanceof FatalExceptionInterface) {
45
                throw $e;
46
            }
47
        }
48
    }
49
50
    abstract public function onMessageReceived($msg);
51
}
52