Completed
Pull Request — master (#5)
by Quim
04:42 queued 02:24
created

QueueWriter::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 7
nc 1
nop 3
1
<?php
2
namespace Infrastructure\AmqpLib\v26\RabbitMQ\Queue;
3
4
use Domain\Queue\Exception\WriterException;
5
use Domain\Queue\Message;
6
use Domain\Queue\QueueWriter as DomainQueueWriter;
7
use Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\ExchangeConfig;
8
use PhpAmqpLib\Channel\AMQPChannel;
9
use PhpAmqpLib\Connection\AMQPLazyConnection;
10
use PhpAmqpLib\Message\AMQPMessage;
11
use Psr\Log\LoggerInterface;
12
13
class QueueWriter implements DomainQueueWriter
14
{
15
    /**
16
     * @var AMQPLazyConnection
17
     */
18
    protected $connection;
19
20
    /**
21
     * @var ExchangeConfig
22
     */
23
    protected $exchangeConfig;
24
25
    /**
26
     * @var LoggerInterface
27
     */
28
    protected $logger;
29
30
    /**
31
     * @var AMQPChannel
32
     */
33
    protected $channel;
34
35
    /**
36
     * QueueWriter constructor.
37
     * @param AMQPLazyConnection $connection
38
     * @param ExchangeConfig $exchangeConfig
39
     * @param LoggerInterface $logger
40
     */
41
    public function __construct(
42
        AMQPLazyConnection $connection,
43
        ExchangeConfig $exchangeConfig,
44
        LoggerInterface $logger
45
    )
46
    {
47
        $this->connection = $connection;
48
        $this->exchangeConfig = $exchangeConfig;
49
        $this->logger = $logger;
50
    }
51
52
    /**
53
     * @param Message[] $messages
54
     * @throws WriterException
55
     * @return null
56
     */
57
    public function write(array $messages)
58
    {
59
        $this->initialize();
60
        try {
61
            $messagesWithDelay = [];
62
            foreach($messages as $message) {
63
                if($message->getDelay() > 0) {
64
                    $messagesWithDelay[$message->getDelay()][] = $message;
65
                    continue;
66
                }
67
                $encodedMessage = json_encode($message);
68
                $this->logger->debug('Writing:' . $encodedMessage);
69
                $msg = new AMQPMessage($encodedMessage, array('delivery_mode' => 2));
70
                $this->channel->batch_basic_publish($msg, $this->exchangeConfig->getName(), $message->getName());
71
            }
72
            $this->channel->publish_batch();
73
            foreach($messagesWithDelay as $delay => $delayedMessages) {
74
                $delayedQueueWriter = new DelayedQueueWriter(
75
                    $this->exchangeConfig->getName(),
76
                    $delay,
77
                    $this->channel,
78
                    $this->logger
79
                );
80
                $delayedQueueWriter->write($delayedMessages);
81
            }
82
        } catch(\Exception $exception) {
83
            $this->logger->error('Error writing messages: '.$exception->getMessage());
84
            throw new WriterException($exception->getMessage(), $exception->getCode());
85
        }
86
    }
87
88
    /**
89
     * @throws WriterException
90
     */
91
    protected function initialize()
92
    {
93
        if($this->channel) {
94
            return;
95
        }
96
        $this->logger->info('Connecting to RabbitMQ');
97
        try {
98
            $this->channel = $this->connection->channel();
99
            $this->channel->exchange_declare(
100
                $this->exchangeConfig->getName(),
101
                $this->exchangeConfig->getType(),
102
                $this->exchangeConfig->getPassive(),
103
                $this->exchangeConfig->getDurable(),
104
                $this->exchangeConfig->getAutoDelete()
105
            );
106
        } catch (\ErrorException $exception) {
107
            $this->logger->error('Error trying to connect to rabbitMQ:' . $exception->getMessage());
108
            throw new WriterException($exception->getMessage(), $exception->getCode());
109
        }
110
    }
111
}