Completed
Push — master ( 5fca0d...2a987b )
by Quim
02:16
created

QueueWriter::write()   B

Complexity

Conditions 6
Paths 24

Size

Total Lines 32
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 32
rs 8.439
c 0
b 0
f 0
cc 6
eloc 24
nc 24
nop 1
1
<?php
2
namespace Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue;
3
4
use Cmp\Queues\Domain\Queue\Exception\WriterException;
5
use Cmp\Queues\Domain\Queue\Message;
6
use Cmp\Queues\Domain\Queue\QueueWriter as DomainQueueWriter;
7
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\ExchangeConfig;
8
use PhpAmqpLib\Channel\AMQPChannel;
9
use PhpAmqpLib\Connection\AMQPLazyConnection;
10
use PhpAmqpLib\Message\AMQPMessage;
11
use Psr\Log\LoggerInterface;
12
13
class QueueWriter implements DomainQueueWriter
14
{
15
    /**
16
     * @var AMQPLazyConnection
17
     */
18
    protected $connection;
19
20
    /**
21
     * @var ExchangeConfig
22
     */
23
    protected $exchangeConfig;
24
25
    /**
26
     * @var LoggerInterface
27
     */
28
    protected $logger;
29
30
    /**
31
     * @var AMQPChannel
32
     */
33
    protected $channel;
34
35
    /**
36
     * @var DelayedQueueWriter[]
37
     */
38
    protected $delayedQueueWriterRegistry = [];
39
40
    /**
41
     * QueueWriter constructor.
42
     * @param AMQPLazyConnection $connection
43
     * @param ExchangeConfig $exchangeConfig
44
     * @param LoggerInterface $logger
45
     */
46
    public function __construct(
47
        AMQPLazyConnection $connection,
48
        ExchangeConfig $exchangeConfig,
49
        LoggerInterface $logger
50
    )
51
    {
52
        $this->connection = $connection;
53
        $this->exchangeConfig = $exchangeConfig;
54
        $this->logger = $logger;
55
    }
56
57
    /**
58
     * @param Message[] $messages
59
     * @throws WriterException
60
     * @return null
61
     */
62
    public function write(array $messages)
63
    {
64
        $this->initialize();
65
        try {
66
            $messagesWithDelay = [];
67
            foreach($messages as $message) {
68
                if($message->getDelay() > 0) {
69
                    $messagesWithDelay[$message->getDelay()][] = $message;
70
                    continue;
71
                }
72
                $encodedMessage = json_encode($message);
73
                $this->logger->debug('Writing:' . $encodedMessage);
74
                $msg = new AMQPMessage($encodedMessage, array('delivery_mode' => 2));
75
                $this->channel->batch_basic_publish($msg, $this->exchangeConfig->getName(), $message->getName());
76
            }
77
            $this->channel->publish_batch();
78
            foreach($messagesWithDelay as $delay => $delayedMessages) {
79
                if(!isset($this->delayedQueueWriterRegistry[$delay])) {
80
                    $this->delayedQueueWriterRegistry[$delay] = new DelayedQueueWriter(
81
                        $this->exchangeConfig->getName(),
82
                        $delay,
83
                        $this->channel,
84
                        $this->logger
85
                    );
86
                }
87
                $this->delayedQueueWriterRegistry[$delay]->write($delayedMessages);
88
            }
89
        } catch(\Exception $exception) {
90
            $this->logger->error('Error writing messages: '.$exception->getMessage());
91
            throw new WriterException($exception->getMessage(), $exception->getCode());
92
        }
93
    }
94
95
    /**
96
     * @throws WriterException
97
     */
98
    protected function initialize()
99
    {
100
        if($this->channel) {
101
            return;
102
        }
103
        $this->logger->info('Connecting to RabbitMQ');
104
        try {
105
            $this->channel = $this->connection->channel();
106
            $this->channel->exchange_declare(
107
                $this->exchangeConfig->getName(),
108
                $this->exchangeConfig->getType(),
109
                $this->exchangeConfig->getPassive(),
110
                $this->exchangeConfig->getDurable(),
111
                $this->exchangeConfig->getAutoDelete()
112
            );
113
        } catch (\ErrorException $exception) {
114
            $this->logger->error('Error trying to connect to rabbitMQ:' . $exception->getMessage());
115
            throw new WriterException($exception->getMessage(), $exception->getCode());
116
        }
117
    }
118
}