Completed
Pull Request — master (#18)
by Peter
04:08
created

RetryProcessor   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 132
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Test Coverage

Coverage 94.44%

Importance

Changes 3
Bugs 0 Features 3
Metric Value
wmc 14
c 3
b 0
f 3
lcom 1
cbo 5
dl 0
loc 132
ccs 34
cts 36
cp 0.9444
rs 10

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A setProcessor() 0 4 1
A getProcessor() 0 4 1
A setMaxAttempts() 0 4 1
A getMaxAttempts() 0 4 1
A process() 0 14 3
A retryMessage() 0 13 3
A getAttemptValue() 0 16 3
1
<?php
2
3
namespace TreeHouse\Queue\Processor\Retry;
4
5
use Psr\Log\LoggerInterface;
6
use TreeHouse\Queue\Amqp\EnvelopeInterface;
7
use TreeHouse\Queue\Exception\ProcessExhaustedException;
8
use TreeHouse\Queue\Processor\ProcessorInterface;
9
10
/**
11
 * Processor that performs a number of attempts when a message could not be processed.
12
 */
13
class RetryProcessor implements ProcessorInterface
14
{
15
    const PROPERTY_KEY = 'attempt';
16
17
    /**
18
     * @var ProcessorInterface
19
     */
20
    protected $processor;
21
22
    /**
23
     * @var RetryStrategyInterface
24
     */
25
    protected $strategy;
26
27
    /**
28
     * @var LoggerInterface
29
     */
30
    protected $logger;
31
32
    /**
33
     * @var int
34
     */
35
    protected $maxAttempts = 2;
36
37
    /**
38
     * @param ProcessorInterface     $processor
39
     * @param RetryStrategyInterface $strategy
40
     * @param LoggerInterface        $logger
41
     */
42
    public function __construct(ProcessorInterface $processor, RetryStrategyInterface $strategy, LoggerInterface $logger = null)
43
    {
44
        $this->processor = $processor;
45
        $this->strategy = $strategy;
46
        $this->logger = $logger;
47 6
    }
48
49 6
    /**
50 6
     * @param ProcessorInterface $processor
51 6
     */
52 6
    public function setProcessor($processor)
53
    {
54
        $this->processor = $processor;
55
    }
56
57 1
    /**
58
     * @return ProcessorInterface
59 1
     */
60 1
    public function getProcessor()
61
    {
62
        return $this->processor;
63
    }
64
65 1
    /**
66
     * @param int $maxAttempts
67 1
     */
68
    public function setMaxAttempts($maxAttempts)
69
    {
70
        $this->maxAttempts = $maxAttempts;
71
    }
72
73 1
    /**
74
     * @return int
75 1
     */
76 1
    public function getMaxAttempts()
77
    {
78
        return $this->maxAttempts;
79
    }
80
81 1
    /**
82
     * @inheritdoc
83 1
     */
84
    public function process(EnvelopeInterface $envelope)
85
    {
86
        try {
87
            $result = $this->processor->process($envelope);
88
        } catch (\Exception $exception) {
89 2
            if ($this->logger) {
90
                $this->logger->error($exception->getMessage(), ['message' => $envelope->getDeliveryTag()]);
91 2
            }
92 2
93
            $result = $this->retryMessage($envelope, $exception);
94
        }
95
96
        return $result;
97 1
    }
98
99 1
    /**
100
     * @param EnvelopeInterface $envelope
101
     * @param \Exception        $exception
102
     *
103
     * @throws ProcessExhaustedException
104
     *
105 4
     * @return bool
106
     */
107
    protected function retryMessage(EnvelopeInterface $envelope, \Exception $exception = null)
108 4
    {
109 1
        $attempt = $this->getAttemptValue($envelope);
110 1
        if ($attempt >= $this->maxAttempts) {
111
            throw new ProcessExhaustedException(sprintf('Exhausted after failing %d attempt(s)', $attempt));
112 1
        }
113
114
        if ($this->logger) {
115
            $this->logger->debug(sprintf('Requeueing message (%d attempts left)', $this->maxAttempts - $attempt));
116
        }
117 4
118 3
        return $this->strategy->retry($envelope, ++$attempt, $exception);
119
    }
120
121 3
    /**
122
     * @param EnvelopeInterface $envelope
123
     *
124
     * @throws \LogicException
125
     *
126
     * @return int
127
     */
128
    protected function getAttemptValue(EnvelopeInterface $envelope)
129
    {
130
        if (false === $attempt = $envelope->getHeader(self::PROPERTY_KEY)) {
131 3
            return 1;
132
        }
133 3
134 3
        $attempt = (integer) $attempt;
135 1
136
        if ($attempt < 1) {
137
            throw new \LogicException(
138 2
                sprintf('Attempt can only be a positive number, got "%s"', json_encode($attempt))
139
            );
140
        }
141
142 2
        return $attempt;
143
    }
144
}
145