Completed
Pull Request — master (#20)
by Anton
03:59
created

Publisher::clearStorage()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Lamoda\QueueBundle;
6
7
use Lamoda\QueueBundle\Entity\QueueEntityInterface;
8
use Lamoda\QueueBundle\Exception\UnexpectedValueException;
9
use Lamoda\QueueBundle\Service\DelayService;
10
use Lamoda\QueueBundle\Service\QueueService;
11
use OldSound\RabbitMqBundle\RabbitMq\Producer;
12
use Psr\Log\LoggerInterface;
13
use Throwable;
14
15
class Publisher
16
{
17
    /** @var Producer */
18
    protected $producer;
19
20
    /** @var QueueService */
21
    protected $queueService;
22
23
    /** @var LoggerInterface */
24
    protected $logger;
25
26
    /** @var DelayService */
27
    protected $delayService;
28
29
    /** @var QueueEntityInterface[] */
30
    protected $publishQueues = [];
31
32 1
    public function __construct(
33
        Producer $producer,
34
        QueueService $queueService,
35
        LoggerInterface $logger,
36
        DelayService $delayService
37
    ) {
38 1
        $this->producer = $producer;
39 1
        $this->queueService = $queueService;
40 1
        $this->logger = $logger;
41 1
        $this->delayService = $delayService;
42 1
    }
43
44
    /**
45
     * @param QueueInterface $queueable
46
     *
47
     * @throws UnexpectedValueException
48
     *
49
     * @return Publisher
50
     */
51
    public function prepareJobForPublish(QueueInterface $queueable): self
52
    {
53
        $queue = $this->queueService->createQueue($queueable);
54
55
        $this->prepareQueueForPublish($queue);
56
57
        $this->logger->info(
58
            'Queue was created',
59
            [
60
                'tracking_id' => $queue->getId(),
61
                'message' => $queue->getId(),
62
                'name' => $queue->getName(),
63
                'exchange' => $queue->getExchange(),
64
                'job_name' => $queue->getJobName(),
65
                'data' => $queue->getData(),
66
            ]
67
        );
68
69
        return $this;
70
    }
71
72 5
    public function prepareQueueForPublish(QueueEntityInterface $queue): self
73
    {
74 5
        $this->publishQueues[] = $queue;
75
76 5
        return $this;
77
    }
78
79 1
    protected function publishQueue(QueueEntityInterface $queueEntity, int $deliveryMode = 2): void
80
    {
81 1
        $this->producer->publish(
82 1
            json_encode(['id' => $queueEntity->getId()]),
83 1
            $queueEntity->getName(),
84 1
            ['delivery_mode' => $deliveryMode]
85
        );
86 1
    }
87
88 4
    public function release(): void
89
    {
90 4
        if (0 === count($this->publishQueues)) {
91
            return;
92
        }
93
94
        //if there is an active transaction some queue entities could be not in database
95 4
        if (!$this->queueService->isTransactionActive()) {
96 4
            $this->releaseQueues($this->publishQueues);
97 4
            $this->clearStorage();
98
        }
99 4
    }
100
101 4
    private function clearStorage(): void
102
    {
103 4
        $this->publishQueues = [];
104 4
    }
105
106
    /**
107
     * @param QueueEntityInterface[] $queues
108
     */
109 4
    private function releaseQueues(array $queues): void
110
    {
111 4
        foreach ($queues as $queue) {
112 4
            if ($queue->isScheduled()) {
113 2
                continue;
114
            }
115
116 2
            $queue->setNew();
117 2
            $this->queueService->save($queue);
118
            try {
119 2
                $this->publishQueue($queue);
120 1
            } catch (Throwable $e) {
121 1
                $this->delayService->delayQueue($queue);
122 1
                $this->queueService->save($queue);
123
124 1
                $this->logger->alert(
125 1
                    ConstantMessage::AMQP_BROKER_IS_DOWN,
126
                    [
127 1
                        'exception' => get_class($e),
128 1
                        'message' => $e->getMessage(),
129 2
                        'trace' => $e->getTraceAsString(),
130
                    ]
131
                );
132
            }
133
        }
134 4
    }
135
}
136