Completed
Pull Request — 1.1 (#7)
by
unknown
01:25
created

AbstractConsumer::callback()   B

Complexity

Conditions 5
Paths 9

Size

Total Lines 24
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 24
rs 8.5125
c 0
b 0
f 0
cc 5
eloc 16
nc 9
nop 1
1
<?php
2
3
namespace Mouf\AmqpClient;
4
5
use Mouf\Utils\Log\Psr\ErrorLogLogger;
6
use Psr\Log\LoggerInterface;
7
8
abstract class AbstractConsumer implements ConsumerInterface
9
{
10
    use ConsumerTrait;
11
12
    private $logger;
13
14
    public function __construct(LoggerInterface $logger = null)
15
    {
16
        $this->logger = $logger ?: new ErrorLogLogger();
17
    }
18
19
    public function callback($msg)
20
    {
21
        try {
22
            $this->onMessageReceived($msg);
23
24
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
25
        } catch (RetryableExceptionInterface $e) {
26
            $this->logger->error("Exception caught while consuming message.", [
27
                'exception' => $e
28
            ]);
29
            if ($e instanceof FatalExceptionInterface) {
30
                throw $e;
31
            }
32
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], true, true);
33
        } catch (\Exception $e) {
34
            $this->logger->error("Exception caught while consuming message.", [
35
                'exception' => $e
36
            ]);
37
            if ($e instanceof FatalExceptionInterface) {
38
                throw $e;
39
            }
40
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], true, false);
41
        }
42
    }
43
44
    abstract public function onMessageReceived($msg);
45
}
46