Completed
Pull Request — master (#7)
by Lexey
03:27
created

Consumer::getMessageLogParams()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Lamoda\QueueBundle;
6
7
use Doctrine\ORM\EntityManagerInterface;
8
use JMS\Serializer\SerializerInterface;
9
use Lamoda\QueueBundle\Entity\QueueEntityInterface;
10
use Lamoda\QueueBundle\Exception\AttemptsReachedException;
11
use Lamoda\QueueBundle\Exception\RuntimeException;
12
use Lamoda\QueueBundle\Exception\UnexpectedValueException;
13
use Lamoda\QueueBundle\Handler\JobHandler;
14
use Lamoda\QueueBundle\Service\DelayService;
15
use Lamoda\QueueBundle\Service\QueueService;
16
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
17
use PhpAmqpLib\Message\AMQPMessage;
18
use Psr\Log\LoggerInterface;
19
use Throwable;
20
21
class Consumer implements ConsumerInterface
22
{
23
    /** @var QueueService */
24
    protected $queueService;
25
26
    /** @var JobHandler */
27
    protected $jobHandler;
28
29
    /** @var SerializerInterface */
30
    protected $serializer;
31
32
    /** @var EntityManagerInterface */
33
    protected $entityManager;
34
35
    /** @var LoggerInterface */
36
    protected $logger;
37
38
    /** @var DelayService */
39
    protected $delayService;
40
41
    public function __construct(
42
        QueueService $queueService,
43
        JobHandler $jobHandler,
44
        SerializerInterface $serializer,
45
        EntityManagerInterface $entityManager,
46
        LoggerInterface $logger,
47
        DelayService $delayService
48
    ) {
49
        $this->queueService = $queueService;
50
        $this->jobHandler = $jobHandler;
51
        $this->serializer = $serializer;
52
        $this->entityManager = $entityManager;
53
        $this->logger = $logger;
54
        $this->delayService = $delayService;
55
    }
56
57
    /**
58
     * @param AMQPMessage $message
59
     *
60
     * @throws \Exception
61
     *
62
     * @return int
63
     */
64 5
    public function execute(AMQPMessage $message): int
65
    {
66
        try {
67 5
            $data = json_decode($message->body, true);
68 5
            $jsonErrorCode = json_last_error();
69
70 5
            if (JSON_ERROR_NONE !== $jsonErrorCode) {
71 1
                throw new UnexpectedValueException('json_decode error: ' . json_last_error_msg(), $jsonErrorCode);
72
            }
73
74 4
            if (!isset($data['id'])) {
75 1
                throw new UnexpectedValueException(ConstantMessage::AMQP_DATA_DAMAGED);
76
            }
77
78 3
            $this->entityManager->clear();
79
80 3
            return $this->doExecute(
81 3
                $this->queueService->getToProcess($data['id'])
82
            );
83 4
        } catch (UnexpectedValueException | AttemptsReachedException $exception) {
84 4
            $this->logger->alert($exception->getMessage(), $this->getMessageLogParams($message));
85
86 4
            return self::MSG_REJECT;
87
        }
88
    }
89
90 3
    protected function doExecute(QueueEntityInterface $queueEntity): int
91
    {
92 3
        $queueData = $queueEntity->getData();
93 3
        $queueName = $queueEntity->getName();
94 3
        $queueId = $queueEntity->getId();
95
96 3
        $this->logger->info('START: ' . $queueName, [
97 3
            'queue_id' => $queueId, 'message_data' => $queueData,
98
        ]);
99
100
        try {
101 3
            $this->executeQueue($queueEntity);
102 1
            $queueEntity->setDone();
103 2
        } catch (RuntimeException $exception) {
104 1
            $this->logger->info(
105 1
                sprintf(ConstantMessage::CONSUMER_JOB_EXECUTING_FAILED, $queueName),
106 1
                $this->buildExceptionContext($exception, $queueEntity)
107
            );
108
109 1
            $this->delayService->delayQueue($queueEntity);
110 1
        } catch (Throwable $exception) {
111 1
            $this->logger->alert(
112 1
                sprintf(ConstantMessage::CONSUMER_JOB_EXECUTING_UNPREDICTABLE_FAILED, $queueName),
113 1
                $this->buildExceptionContext($exception, $queueEntity)
114
            );
115 1
            $queueEntity->setError();
116 3
        } finally {
117 3
            $this->queueService->save($queueEntity);
118
119 3
            $this->logger->info(
120 3
                'END:' . $queueName,
121
                [
122 3
                    'queue_id' => $queueId, 'message_data' => $queueData,
123
                ]
124
            );
125
        }
126
127 3
        return self::MSG_ACK;
128
    }
129
130
    protected function executeQueue(QueueEntityInterface $queueEntity): void
131
    {
132
        $data = json_encode($queueEntity->getData());
133
134
        /** @var QueueInterface $job */
135
        $job = $this->serializer->deserialize($data, $queueEntity->getJobName(), 'json');
136
137
        $this->jobHandler->handle($job);
138
    }
139
140 4
    private function getMessageLogParams(AMQPMessage $message): array
141
    {
142
        return [
143 4
            'amqp_message_body' => $message->body,
144
        ];
145
    }
146
147 2
    private function buildExceptionContext(Throwable $exception, QueueEntityInterface $queue): array
148
    {
149
        return [
150 2
            'exception' => get_class($exception),
151 2
            'message' => $exception->getMessage(),
152 2
            'id' => $queue->getId(),
153 2
            'job_name' => $queue->getJobName(),
154
        ];
155
    }
156
}
157