Completed
Push — master ( d73237...40effa )
by Olivier
05:28 queued 11s
created

XDeathMaxLifetimeProcessor::__construct()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 9

Duplication

Lines 11
Ratio 100 %

Importance

Changes 0
Metric Value
dl 11
loc 11
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 9
nc 2
nop 4
1
<?php
2
3
namespace Swarrot\Processor\XDeath;
4
5
use PhpAmqpLib\Wire\AMQPArray;
6
use Psr\Log\LoggerInterface;
7
use Psr\Log\LogLevel;
8
use Psr\Log\NullLogger;
9
use Swarrot\Broker\Message;
10
use Swarrot\Processor\ProcessorInterface;
11
use Swarrot\Processor\ConfigurableInterface;
12
use Symfony\Component\OptionsResolver\OptionsResolver;
13
14
class XDeathMaxLifetimeProcessor implements ConfigurableInterface
15
{
16
    /**
17
     * @var ProcessorInterface
18
     */
19
    private $processor;
20
21
    /**
22
     * @var string
23
     */
24
    private $queueName;
25
26
    /**
27
     * @var callable
28
     */
29
    private $callback;
30
31
    /**
32
     * @var LoggerInterface
33
     */
34
    private $logger;
35
36
    /**
37
     * @param ProcessorInterface   $processor
38
     * @param string               $queueName
39
     * @param callable             $callback
40
     * @param LoggerInterface|null $logger
41
     */
42 View Code Duplication
    public function __construct(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
43
        ProcessorInterface $processor,
44
        string $queueName,
45
        callable $callback,
46
        LoggerInterface $logger = null
47
    ) {
48
        $this->processor = $processor;
49
        $this->queueName = $queueName;
50
        $this->callback = $callback;
51
        $this->logger = $logger ?: new NullLogger();
52
    }
53
54
    /**
55
     * {@inheritdoc}
56
     */
57
    public function setDefaultOptions(OptionsResolver $resolver)
58
    {
59
        $resolver
60
            ->setDefaults(array(
61
                'x_death_max_lifetime' => 3600,
62
                'x_death_max_lifetime_log_levels_map' => array(),
63
                'x_death_max_lifetime_fail_log_levels_map' => array(),
64
            ))
65
            ->setAllowedTypes('x_death_max_lifetime', 'int')
66
            ->setAllowedTypes('x_death_max_lifetime_log_levels_map', 'array')
67
            ->setAllowedTypes('x_death_max_lifetime_fail_log_levels_map', 'array');
68
    }
69
70
    /**
71
     * {@inheritdoc}
72
     */
73 View Code Duplication
    public function process(Message $message, array $options)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
74
    {
75
        try {
76
            return $this->processor->process($message, $options);
77
        } catch (\Throwable $e) {
78
            return $this->handleException($e, $message, $options);
79
        } catch (\Exception $e) {
80
            return $this->handleException($e, $message, $options);
81
        }
82
    }
83
84
    /**
85
     * @param \Exception|\Throwable $exception
86
     * @param Message               $message
87
     * @param array                 $options
88
     *
89
     * @return mixed
90
     */
91
    private function handleException($exception, Message $message, array $options)
92
    {
93
        $headers = $message->getProperties();
94
        if (isset($headers['headers']['x-death'])) {
95
            $xDeathHeaders = $headers['headers']['x-death'];
96
            // PhpAmqpLib compatibility
97
            if ($xDeathHeaders instanceof AMQPArray) {
98
                $xDeathHeaders = $headers['headers']['x-death']->getNativeData();
99
            }
100
101
            $queueXDeathHeader = null;
102 View Code Duplication
            foreach ($xDeathHeaders as $xDeathHeader) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
103
                if (isset($xDeathHeader['queue']) && $xDeathHeader['queue'] === $this->queueName) {
104
                    $queueXDeathHeader = $xDeathHeader;
105
                    break;
106
                }
107
            }
108
109
            if (null === $queueXDeathHeader) {
110
                $this->logException(
111
                    $exception,
112
                    sprintf(
113
                        '[XDeathMaxLifetime] No x-death header found for queue name "%s". Do nothing.',
114
                        $this->queueName
115
                    ),
116
                    $options['x_death_max_lifetime_fail_log_levels_map']
117
                );
118
            } elseif (isset($queueXDeathHeader['time'])) {
119
                $xDeathTimestamp = $queueXDeathHeader['time'];
120
                // PhpAmqpLib compatibility
121
                if ($xDeathTimestamp instanceof \DateTime) {
122
                    $xDeathTimestamp = $xDeathTimestamp->getTimestamp();
123
                }
124
                $remainLifetime = $xDeathTimestamp - (time() - $options['x_death_max_lifetime']);
125
                if ($remainLifetime <= 0) {
126
                    $this->logException(
127
                        $exception,
128
                        sprintf(
129
                            '[XDeathMaxLifetime] Max lifetime reached. %s/%s seconds exceed. Execute the configured callback.',
130
                            abs($remainLifetime),
131
                            $options['x_death_max_lifetime']
132
                        ),
133
                        $options['x_death_max_lifetime_fail_log_levels_map']
134
                    );
135
136
                    if (null !== $return = \call_user_func($this->callback, $exception, $message, $options)) {
137
                        return $return;
138
                    }
139
                } else {
140
                    $this->logException(
141
                        $exception,
142
                        sprintf(
143
                            '[XDeathMaxLifetime] Lifetime remain %d/%d seconds.',
144
                            $remainLifetime,
145
                            $options['x_death_max_lifetime']
146
                        ),
147
                        $options['x_death_max_lifetime_log_levels_map']
148
                    );
149
                }
150
            }
151
        }
152
153
        throw $exception;
154
    }
155
156
    /**
157
     * @param \Exception|\Throwable $exception
158
     * @param string                $logMessage
159
     * @param array                 $logLevelsMap
160
     */
161 View Code Duplication
    private function logException($exception, $logMessage, array $logLevelsMap)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
162
    {
163
        $logLevel = LogLevel::WARNING;
164
165
        foreach ($logLevelsMap as $className => $level) {
166
            if ($exception instanceof $className) {
167
                $logLevel = $level;
168
169
                break;
170
            }
171
        }
172
173
        $this->logger->log(
174
            $logLevel,
175
            $logMessage,
176
            [
177
                'swarrot_processor' => 'x_death_max_lifetime',
178
                'exception' => $exception,
179
            ]
180
        );
181
    }
182
}
183