Completed
Push — master ( 51c8d9...38822b )
by Lexey
02:42
created

Publisher::releaseQueues()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 26
ccs 16
cts 16
cp 1
rs 9.504
c 0
b 0
f 0
cc 4
nc 4
nop 1
crap 4
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Lamoda\QueueBundle;
6
7
use Lamoda\QueueBundle\Entity\QueueEntityInterface;
8
use Lamoda\QueueBundle\Exception\UnexpectedValueException;
9
use Lamoda\QueueBundle\Service\DelayService;
10
use Lamoda\QueueBundle\Service\QueueService;
11
use OldSound\RabbitMqBundle\RabbitMq\Producer;
12
use Psr\Log\LoggerInterface;
13
use Throwable;
14
15
class Publisher
16
{
17
    /** @var Producer */
18
    protected $producer;
19
20
    /** @var QueueService */
21
    protected $queueService;
22
23
    /** @var LoggerInterface */
24
    protected $logger;
25
26
    /** @var DelayService */
27
    protected $delayService;
28
29
    /** @var QueueEntityInterface[] */
30
    protected $publishQueues = [];
31
32 1
    public function __construct(
33
        Producer $producer,
34
        QueueService $queueService,
35
        LoggerInterface $logger,
36
        DelayService $delayService
37
    ) {
38 1
        $this->producer = $producer;
39 1
        $this->queueService = $queueService;
40 1
        $this->logger = $logger;
41 1
        $this->delayService = $delayService;
42 1
    }
43
44
    /**
45
     * @param QueueInterface $queueable
46
     *
47
     * @throws UnexpectedValueException
48
     *
49
     * @return Publisher
50
     */
51
    public function prepareJobForPublish(QueueInterface $queueable): self
52
    {
53
        $this->prepareQueueForPublish(
54
            $this->queueService->createQueue($queueable)
55
        );
56
57
        return $this;
58
    }
59
60 5
    public function prepareQueueForPublish(QueueEntityInterface $queue): self
61
    {
62 5
        $this->publishQueues[] = $queue;
63
64 5
        return $this;
65
    }
66
67 1
    protected function publishQueue(QueueEntityInterface $queueEntity, int $deliveryMode = 2): void
68
    {
69 1
        $this->producer->publish(
70 1
            json_encode(['id' => $queueEntity->getId()]),
71 1
            $queueEntity->getName(),
72 1
            ['delivery_mode' => $deliveryMode]
73
        );
74 1
    }
75
76 4
    public function release(): void
77
    {
78 4
        if (0 === count($this->publishQueues)) {
79
            return;
80
        }
81
82
        //if there is an active transaction some queue entities could be not in database
83 4
        if (!$this->queueService->isTransactionActive()) {
84 4
            $this->releaseQueues($this->publishQueues);
85 4
            $this->clearStorage();
86
        }
87 4
    }
88
89 4
    private function clearStorage(): void
90
    {
91 4
        $this->publishQueues = [];
92 4
    }
93
94
    /**
95
     * @param QueueEntityInterface[] $queues
96
     */
97 4
    private function releaseQueues(array $queues): void
98
    {
99 4
        foreach ($queues as $queue) {
100 4
            if ($queue->isScheduled()) {
101 2
                continue;
102
            }
103
104 2
            $queue->setNew();
105 2
            $this->queueService->save($queue);
106
            try {
107 2
                $this->publishQueue($queue);
108 1
            } catch (Throwable $e) {
109 1
                $this->delayService->delayQueue($queue);
110 1
                $this->queueService->save($queue);
111
112 1
                $this->logger->alert(
113 1
                    ConstantMessage::AMQP_BROKER_IS_DOWN,
114
                    [
115 1
                        'exception' => get_class($e),
116 1
                        'message' => $e->getMessage(),
117 2
                        'trace' => $e->getTraceAsString(),
118
                    ]
119
                );
120
            }
121
        }
122 4
    }
123
}
124