1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Swarrot\Processor\XDeath; |
4
|
|
|
|
5
|
|
|
use PhpAmqpLib\Wire\AMQPArray; |
6
|
|
|
use Psr\Log\LoggerInterface; |
7
|
|
|
use Psr\Log\LogLevel; |
8
|
|
|
use Psr\Log\NullLogger; |
9
|
|
|
use Swarrot\Broker\Message; |
10
|
|
|
use Swarrot\Processor\ProcessorInterface; |
11
|
|
|
use Swarrot\Processor\ConfigurableInterface; |
12
|
|
|
use Symfony\Component\OptionsResolver\OptionsResolver; |
13
|
|
|
|
14
|
|
|
class XDeathMaxLifetimeProcessor implements ConfigurableInterface |
15
|
|
|
{ |
16
|
|
|
/** |
17
|
|
|
* @var ProcessorInterface |
18
|
|
|
*/ |
19
|
|
|
private $processor; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* @var string |
23
|
|
|
*/ |
24
|
|
|
private $queueName; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* @var callable |
28
|
|
|
*/ |
29
|
|
|
private $callback; |
30
|
|
|
|
31
|
|
|
/** |
32
|
|
|
* @var LoggerInterface |
33
|
|
|
*/ |
34
|
|
|
private $logger; |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* @param ProcessorInterface $processor |
38
|
|
|
* @param string $queueName |
39
|
|
|
* @param callable $callback |
40
|
|
|
* @param LoggerInterface|null $logger |
41
|
|
|
*/ |
42
|
|
View Code Duplication |
public function __construct( |
|
|
|
|
43
|
|
|
ProcessorInterface $processor, |
44
|
|
|
string $queueName, |
45
|
|
|
callable $callback, |
46
|
|
|
LoggerInterface $logger = null |
47
|
|
|
) { |
48
|
|
|
$this->processor = $processor; |
49
|
|
|
$this->queueName = $queueName; |
50
|
|
|
$this->callback = $callback; |
51
|
|
|
$this->logger = $logger ?: new NullLogger(); |
52
|
|
|
} |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* {@inheritdoc} |
56
|
|
|
*/ |
57
|
|
|
public function setDefaultOptions(OptionsResolver $resolver) |
58
|
|
|
{ |
59
|
|
|
$resolver |
60
|
|
|
->setDefaults(array( |
61
|
|
|
'x_death_max_lifetime' => 3600, |
62
|
|
|
'x_death_max_lifetime_log_levels_map' => array(), |
63
|
|
|
'x_death_max_lifetime_fail_log_levels_map' => array(), |
64
|
|
|
)) |
65
|
|
|
->setAllowedTypes('x_death_max_lifetime', 'int') |
66
|
|
|
->setAllowedTypes('x_death_max_lifetime_log_levels_map', 'array') |
67
|
|
|
->setAllowedTypes('x_death_max_lifetime_fail_log_levels_map', 'array'); |
68
|
|
|
} |
69
|
|
|
|
70
|
|
|
/** |
71
|
|
|
* {@inheritdoc} |
72
|
|
|
*/ |
73
|
|
View Code Duplication |
public function process(Message $message, array $options) |
|
|
|
|
74
|
|
|
{ |
75
|
|
|
try { |
76
|
|
|
return $this->processor->process($message, $options); |
77
|
|
|
} catch (\Throwable $e) { |
78
|
|
|
return $this->handleException($e, $message, $options); |
79
|
|
|
} catch (\Exception $e) { |
80
|
|
|
return $this->handleException($e, $message, $options); |
81
|
|
|
} |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* @param \Exception|\Throwable $exception |
86
|
|
|
* @param Message $message |
87
|
|
|
* @param array $options |
88
|
|
|
* |
89
|
|
|
* @return mixed |
90
|
|
|
*/ |
91
|
|
|
private function handleException($exception, Message $message, array $options) |
92
|
|
|
{ |
93
|
|
|
$headers = $message->getProperties(); |
94
|
|
|
if (isset($headers['headers']['x-death'])) { |
95
|
|
|
$xDeathHeaders = $headers['headers']['x-death']; |
96
|
|
|
// PhpAmqpLib compatibility |
97
|
|
|
if ($xDeathHeaders instanceof AMQPArray) { |
98
|
|
|
$xDeathHeaders = $headers['headers']['x-death']->getNativeData(); |
99
|
|
|
} |
100
|
|
|
|
101
|
|
|
$queueXDeathHeader = null; |
102
|
|
View Code Duplication |
foreach ($xDeathHeaders as $xDeathHeader) { |
|
|
|
|
103
|
|
|
if (isset($xDeathHeader['queue']) && $xDeathHeader['queue'] === $this->queueName) { |
104
|
|
|
$queueXDeathHeader = $xDeathHeader; |
105
|
|
|
break; |
106
|
|
|
} |
107
|
|
|
} |
108
|
|
|
|
109
|
|
|
if (null === $queueXDeathHeader) { |
110
|
|
|
$this->logException( |
111
|
|
|
$exception, |
112
|
|
|
sprintf( |
113
|
|
|
'[XDeathMaxLifetime] No x-death header found for queue name "%s". Do nothing.', |
114
|
|
|
$this->queueName |
115
|
|
|
), |
116
|
|
|
$options['x_death_max_lifetime_fail_log_levels_map'] |
117
|
|
|
); |
118
|
|
|
} elseif (isset($queueXDeathHeader['time'])) { |
119
|
|
|
$xDeathTimestamp = $queueXDeathHeader['time']; |
120
|
|
|
// PhpAmqpLib compatibility |
121
|
|
|
if ($xDeathTimestamp instanceof \DateTime) { |
122
|
|
|
$xDeathTimestamp = $xDeathTimestamp->getTimestamp(); |
123
|
|
|
} |
124
|
|
|
$remainLifetime = $xDeathTimestamp - (time() - $options['x_death_max_lifetime']); |
125
|
|
|
if ($remainLifetime <= 0) { |
126
|
|
|
$this->logException( |
127
|
|
|
$exception, |
128
|
|
|
sprintf( |
129
|
|
|
'[XDeathMaxLifetime] Max lifetime reached. %s/%s seconds exceed. Execute the configured callback.', |
130
|
|
|
abs($remainLifetime), |
131
|
|
|
$options['x_death_max_lifetime'] |
132
|
|
|
), |
133
|
|
|
$options['x_death_max_lifetime_fail_log_levels_map'] |
134
|
|
|
); |
135
|
|
|
|
136
|
|
|
if (null !== $return = \call_user_func($this->callback, $exception, $message, $options)) { |
137
|
|
|
return $return; |
138
|
|
|
} |
139
|
|
|
} else { |
140
|
|
|
$this->logException( |
141
|
|
|
$exception, |
142
|
|
|
sprintf( |
143
|
|
|
'[XDeathMaxLifetime] Lifetime remain %d/%d seconds.', |
144
|
|
|
$remainLifetime, |
145
|
|
|
$options['x_death_max_lifetime'] |
146
|
|
|
), |
147
|
|
|
$options['x_death_max_lifetime_log_levels_map'] |
148
|
|
|
); |
149
|
|
|
} |
150
|
|
|
} |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
throw $exception; |
154
|
|
|
} |
155
|
|
|
|
156
|
|
|
/** |
157
|
|
|
* @param \Exception|\Throwable $exception |
158
|
|
|
* @param string $logMessage |
159
|
|
|
* @param array $logLevelsMap |
160
|
|
|
*/ |
161
|
|
View Code Duplication |
private function logException($exception, $logMessage, array $logLevelsMap) |
|
|
|
|
162
|
|
|
{ |
163
|
|
|
$logLevel = LogLevel::WARNING; |
164
|
|
|
|
165
|
|
|
foreach ($logLevelsMap as $className => $level) { |
166
|
|
|
if ($exception instanceof $className) { |
167
|
|
|
$logLevel = $level; |
168
|
|
|
|
169
|
|
|
break; |
170
|
|
|
} |
171
|
|
|
} |
172
|
|
|
|
173
|
|
|
$this->logger->log( |
174
|
|
|
$logLevel, |
175
|
|
|
$logMessage, |
176
|
|
|
[ |
177
|
|
|
'swarrot_processor' => 'x_death_max_lifetime', |
178
|
|
|
'exception' => $exception, |
179
|
|
|
] |
180
|
|
|
); |
181
|
|
|
} |
182
|
|
|
} |
183
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.