RetryProcessor   A
last analyzed

Complexity

Total Complexity 13

Size/Duplication

Total Lines 129
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 91.67%

Importance

Changes 0
Metric Value
dl 0
loc 129
c 0
b 0
f 0
wmc 13
lcom 1
cbo 6
ccs 33
cts 36
cp 0.9167
rs 10

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 2
A setProcessor() 0 4 1
A getProcessor() 0 4 1
A setMaxAttempts() 0 4 1
A getMaxAttempts() 0 4 1
A process() 0 13 2
A retryMessage() 0 11 2
A getAttemptValue() 0 16 3
1
<?php
2
3
namespace TreeHouse\Queue\Processor\Retry;
4
5
use Psr\Log\LoggerInterface;
6
use Psr\Log\NullLogger;
7
use TreeHouse\Queue\Amqp\EnvelopeInterface;
8
use TreeHouse\Queue\Exception\ProcessExhaustedException;
9
use TreeHouse\Queue\Processor\ProcessorInterface;
10
11
/**
12
 * Processor that performs a number of attempts when a message could not be processed.
13
 */
14
class RetryProcessor implements ProcessorInterface
15
{
16
    const PROPERTY_KEY = 'attempt';
17
18
    /**
19
     * @var ProcessorInterface
20
     */
21
    protected $processor;
22
23
    /**
24
     * @var RetryStrategyInterface
25
     */
26
    protected $strategy;
27
28
    /**
29
     * @var LoggerInterface
30
     */
31
    protected $logger;
32
33
    /**
34
     * @var int
35
     */
36
    protected $maxAttempts = 2;
37
38
    /**
39
     * @param ProcessorInterface     $processor
40
     * @param RetryStrategyInterface $strategy
41
     * @param LoggerInterface        $logger
42
     */
43 5
    public function __construct(ProcessorInterface $processor, RetryStrategyInterface $strategy, LoggerInterface $logger = null)
44
    {
45 5
        $this->processor = $processor;
46 5
        $this->strategy = $strategy;
47 5
        $this->logger = $logger ?: new NullLogger();
48 5
    }
49
50
    /**
51
     * @param ProcessorInterface $processor
52
     */
53 1
    public function setProcessor($processor)
54
    {
55 1
        $this->processor = $processor;
56 1
    }
57
58
    /**
59
     * @return ProcessorInterface
60
     */
61 1
    public function getProcessor()
62
    {
63 1
        return $this->processor;
64
    }
65
66
    /**
67
     * @param int $maxAttempts
68
     */
69 2
    public function setMaxAttempts($maxAttempts)
70
    {
71 2
        $this->maxAttempts = $maxAttempts;
72 2
    }
73
74
    /**
75
     * @return int
76
     */
77 1
    public function getMaxAttempts()
78
    {
79 1
        return $this->maxAttempts;
80
    }
81
82
    /**
83
     * @inheritdoc
84
     */
85 3
    public function process(EnvelopeInterface $envelope)
86
    {
87
        try {
88 3
            $result = $this->processor->process($envelope);
89 2
        } catch (\Exception $exception) {
90 2
            $this->logger->error($exception->getMessage(), ['message' => $envelope->getDeliveryTag()]);
91 2
            $this->logger->debug($exception->getTraceAsString());
92
93 2
            $result = $this->retryMessage($envelope, $exception);
94
        }
95
96 2
        return $result;
97
    }
98
99
    /**
100
     * @param EnvelopeInterface $envelope
101
     * @param \Exception        $exception
102
     *
103
     * @throws ProcessExhaustedException
104
     *
105
     * @return bool
106
     */
107 2
    protected function retryMessage(EnvelopeInterface $envelope, \Exception $exception = null)
108
    {
109 2
        $attempt = $this->getAttemptValue($envelope);
110 2
        if ($attempt >= $this->maxAttempts) {
111 1
            throw new ProcessExhaustedException(sprintf('Exhausted after failing %d attempt(s)', $attempt));
112
        }
113
114 1
        $this->logger->debug(sprintf('Requeueing message (%d attempts left)', $this->maxAttempts - $attempt));
115
116 1
        return $this->strategy->retry($envelope, ++$attempt, $exception);
117
    }
118
119
    /**
120
     * @param EnvelopeInterface $envelope
121
     *
122
     * @throws \LogicException
123
     *
124
     * @return int
125
     */
126 2
    protected function getAttemptValue(EnvelopeInterface $envelope)
127
    {
128 2
        if (false === $attempt = $envelope->getHeader(self::PROPERTY_KEY)) {
129
            return 1;
130
        }
131
132 2
        $attempt = (integer) $attempt;
133
134 2
        if ($attempt < 1) {
135
            throw new \LogicException(
136
                sprintf('Attempt can only be a positive number, got "%s"', json_encode($attempt))
137
            );
138
        }
139
140 2
        return $attempt;
141
    }
142
}
143