Completed
Pull Request — master (#18)
by Peter
04:08
created

RetryProcessor::setCooldownTime()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace TreeHouse\Queue\Processor\Retry;
4
5
use Psr\Log\LoggerInterface;
6
use TreeHouse\Queue\Amqp\EnvelopeInterface;
7
use TreeHouse\Queue\Exception\ProcessExhaustedException;
8
use TreeHouse\Queue\Processor\ProcessorInterface;
9
10
/**
11
 * Processor that performs a number of attempts when a message could not be processed.
12
 */
13
class RetryProcessor implements ProcessorInterface
14
{
15
    const PROPERTY_KEY = 'attempt';
16
17
    /**
18
     * @var ProcessorInterface
19
     */
20
    protected $processor;
21
22
    /**
23
     * @var RetryStrategyInterface
24
     */
25
    protected $strategy;
26
27
    /**
28
     * @var LoggerInterface
29
     */
30
    protected $logger;
31
32
    /**
33
     * @var int
34
     */
35
    protected $maxAttempts = 2;
36
37
    /**
38
     * @param ProcessorInterface     $processor
39
     * @param RetryStrategyInterface $strategy
40
     * @param LoggerInterface        $logger
41
     */
42
    public function __construct(ProcessorInterface $processor, RetryStrategyInterface $strategy, LoggerInterface $logger = null)
43
    {
44
        $this->processor = $processor;
45
        $this->strategy = $strategy;
46
        $this->logger = $logger;
47 6
    }
48
49 6
    /**
50 6
     * @param ProcessorInterface $processor
51 6
     */
52 6
    public function setProcessor($processor)
53
    {
54
        $this->processor = $processor;
55
    }
56
57 1
    /**
58
     * @return ProcessorInterface
59 1
     */
60 1
    public function getProcessor()
61
    {
62
        return $this->processor;
63
    }
64
65 1
    /**
66
     * @param int $maxAttempts
67 1
     */
68
    public function setMaxAttempts($maxAttempts)
69
    {
70
        $this->maxAttempts = $maxAttempts;
71
    }
72
73 1
    /**
74
     * @return int
75 1
     */
76 1
    public function getMaxAttempts()
77
    {
78
        return $this->maxAttempts;
79
    }
80
81 1
    /**
82
     * @inheritdoc
83 1
     */
84
    public function process(EnvelopeInterface $envelope)
85
    {
86
        try {
87
            $result = $this->processor->process($envelope);
88
        } catch (\Exception $exception) {
89 2
            if ($this->logger) {
90
                $this->logger->error($exception->getMessage(), ['message' => $envelope->getDeliveryTag()]);
91 2
            }
92 2
93
            $result = $this->retryMessage($envelope, $exception);
94
        }
95
96
        return $result;
97 1
    }
98
99 1
    /**
100
     * @param EnvelopeInterface $envelope
101
     * @param \Exception        $exception
102
     *
103
     * @throws ProcessExhaustedException
104
     *
105 4
     * @return bool
106
     */
107
    protected function retryMessage(EnvelopeInterface $envelope, \Exception $exception = null)
108 4
    {
109 1
        $attempt = $this->getAttemptValue($envelope);
110 1
        if ($attempt >= $this->maxAttempts) {
111
            throw new ProcessExhaustedException(sprintf('Exhausted after failing %d attempt(s)', $attempt));
112 1
        }
113
114
        if ($this->logger) {
115
            $this->logger->debug(sprintf('Requeueing message (%d attempts left)', $this->maxAttempts - $attempt));
116
        }
117 4
118 3
        return $this->strategy->retry($envelope, ++$attempt, $exception);
119
    }
120
121 3
    /**
122
     * @param EnvelopeInterface $envelope
123
     *
124
     * @throws \LogicException
125
     *
126
     * @return int
127
     */
128
    protected function getAttemptValue(EnvelopeInterface $envelope)
129
    {
130
        if (false === $attempt = $envelope->getHeader(self::PROPERTY_KEY)) {
131 3
            return 1;
132
        }
133 3
134 3
        $attempt = (integer) $attempt;
135 1
136
        if ($attempt < 1) {
137
            throw new \LogicException(
138 2
                sprintf('Attempt can only be a positive number, got "%s"', json_encode($attempt))
139
            );
140
        }
141
142 2
        return $attempt;
143
    }
144
}
145