Passed
Pull Request — master (#19)
by
unknown
04:53
created

Consumer::buildExceptionContext()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 5
cts 5
cp 1
rs 9.9666
c 0
b 0
f 0
cc 1
nc 1
nop 2
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Lamoda\QueueBundle;
6
7
use Doctrine\ORM\EntityManagerInterface;
8
use JMS\Serializer\SerializerInterface;
9
use Lamoda\QueueBundle\Entity\QueueEntityInterface;
10
use Lamoda\QueueBundle\Exception\AttemptsReachedException;
11
use Lamoda\QueueBundle\Exception\RuntimeException;
12
use Lamoda\QueueBundle\Exception\UnexpectedValueException;
13
use Lamoda\QueueBundle\Handler\JobHandler;
14
use Lamoda\QueueBundle\Service\DelayService;
15
use Lamoda\QueueBundle\Service\QueueService;
16
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
17
use PhpAmqpLib\Message\AMQPMessage;
18
use Psr\Log\LoggerInterface;
19
use Throwable;
20
21
class Consumer implements ConsumerInterface
22
{
23
    /** @var QueueService */
24
    protected $queueService;
25
26
    /** @var JobHandler */
27
    protected $jobHandler;
28
29
    /** @var SerializerInterface */
30
    protected $serializer;
31
32
    /** @var EntityManagerInterface */
33
    protected $entityManager;
34
35
    /** @var LoggerInterface */
36
    protected $logger;
37
38
    /** @var DelayService */
39
    protected $delayService;
40
41
    public function __construct(
42
        QueueService $queueService,
43
        JobHandler $jobHandler,
44
        SerializerInterface $serializer,
45
        EntityManagerInterface $entityManager,
46
        LoggerInterface $logger,
47
        DelayService $delayService
48
    ) {
49
        $this->queueService = $queueService;
50
        $this->jobHandler = $jobHandler;
51
        $this->serializer = $serializer;
52
        $this->entityManager = $entityManager;
53
        $this->logger = $logger;
54
        $this->delayService = $delayService;
55
    }
56
57
    /**
58
     * @param AMQPMessage $message
59
     *
60
     * @throws \Exception
61
     *
62
     * @return int
63
     */
64 5
    public function execute(AMQPMessage $message): int
65
    {
66
        try {
67 5
            $data = json_decode($message->body, true);
68 5
            $jsonErrorCode = json_last_error();
69
70 5
            if (JSON_ERROR_NONE !== $jsonErrorCode) {
71 1
                throw new UnexpectedValueException('json_decode error: ' . json_last_error_msg(), $jsonErrorCode);
72
            }
73
74 4
            if (!isset($data['id'])) {
75 1
                throw new UnexpectedValueException(ConstantMessage::AMQP_DATA_DAMAGED);
76
            }
77
78 3
            $this->entityManager->clear();
79
80 3
            return $this->doExecute(
81 3
                $this->queueService->getToProcess($data['id'])
82
            );
83 4
        } catch (UnexpectedValueException $exception) {
84 3
            $this->logger->alert($exception->getMessage(), $this->getMessageLogParams($message));
85 1
        } catch (AttemptsReachedException $exception) {
86 1
            $this->logger->notice($exception->getMessage(), $this->getMessageLogParams($message));
87
        }
88
89 4
        return self::MSG_REJECT;
90
    }
91
92 3
    protected function doExecute(QueueEntityInterface $queueEntity): int
93
    {
94 3
        $queueData = $queueEntity->getData();
95 3
        $queueName = $queueEntity->getName();
96 3
        $queueId = $queueEntity->getId();
97
98 3
        $this->logger->info('START: ' . $queueName, [
99 3
            'queue_id' => $queueId, 'message_data' => $queueData,
100
        ]);
101
102
        try {
103 3
            $this->executeQueue($queueEntity);
104 1
            $queueEntity->setDone();
105 2
        } catch (RuntimeException $exception) {
106 1
            $this->logger->info(
107 1
                sprintf(ConstantMessage::CONSUMER_JOB_EXECUTING_FAILED, $queueName),
108 1
                $this->buildExceptionContext($exception, $queueEntity)
109
            );
110
111 1
            $this->delayService->delayQueue($queueEntity);
112 1
        } catch (Throwable $exception) {
113 1
            $this->logger->alert(
114 1
                sprintf(ConstantMessage::CONSUMER_JOB_EXECUTING_UNPREDICTABLE_FAILED, $queueName),
115 1
                $this->buildExceptionContext($exception, $queueEntity)
116
            );
117 1
            $queueEntity->setError();
118 3
        } finally {
119 3
            $this->queueService->save($queueEntity);
120
121 3
            $this->logger->info(
122 3
                'END:' . $queueName,
123
                [
124 3
                    'queue_id' => $queueId, 'message_data' => $queueData,
125
                ]
126
            );
127
        }
128
129 3
        return self::MSG_ACK;
130
    }
131
132
    protected function executeQueue(QueueEntityInterface $queueEntity): void
133
    {
134
        $data = json_encode($queueEntity->getData());
135
136
        /** @var QueueInterface $job */
137
        $job = $this->serializer->deserialize($data, $queueEntity->getJobName(), 'json');
138
139
        $this->jobHandler->handle($job);
140
    }
141
142 4
    private function getMessageLogParams(AMQPMessage $message): array
143
    {
144
        return [
145 4
            'amqp_message_body' => $message->body,
146
        ];
147
    }
148
149 2
    private function buildExceptionContext(Throwable $exception, QueueEntityInterface $queue): array
150
    {
151
        return [
152 2
            'exception' => get_class($exception),
153 2
            'message' => $exception->getMessage(),
154 2
            'id' => $queue->getId(),
155 2
            'job_name' => $queue->getJobName(),
156
        ];
157
    }
158
}
159