Completed
Pull Request — 1.1 (#7)
by
unknown
31:40
created

AbstractConsumer::__construct()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 2
eloc 2
nc 2
nop 1
1
<?php
2
3
namespace Mouf\AmqpClient;
4
5
use Mouf\Utils\Log\Psr\ErrorLogLogger;
6
use Psr\Log\LoggerInterface;
7
8
abstract class AbstractConsumer implements ConsumerInterface
9
{
10
    use ConsumerTrait;
11
12
    private $logger;
13
14
    public function __construct(LoggerInterface $logger = null)
15
    {
16
        $this->logger = $logger ?: new ErrorLogLogger();
17
    }
18
19
    public function callback($msg)
20
    {
21
        try {
22
            $this->onMessageReceived($msg);
23
24
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
25
        } catch (RetryableExceptionInterface $e) {
26
            $this->logger->error("Exception caught while consuming message.", [
27
                'exception' => $e
28
            ]);
29
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], true, true);
30
            if ($e instanceof FatalExceptionInterface) {
31
                throw $e;
32
            }
33
        } catch (\Exception $e) {
34
            $this->logger->error("Exception caught while consuming message.", [
35
                'exception' => $e
36
            ]);
37
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], true, false);
38
            if ($e instanceof FatalExceptionInterface) {
39
                throw $e;
40
            }
41
        }
42
    }
43
44
    abstract public function onMessageReceived($msg);
45
}
46