Completed
Push — master ( 5fca0d...2a987b )
by Quim
02:16
created

DelayedQueueWriter::initialize()   B

Complexity

Conditions 2
Paths 6

Size

Total Lines 28
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 28
rs 8.8571
c 0
b 0
f 0
cc 2
eloc 20
nc 6
nop 0
1
<?php
2
/**
3
 * Created by PhpStorm.
4
 * User: quimmanrique
5
 * Date: 15/02/17
6
 * Time: 12:00
7
 */
8
9
namespace Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue;
10
11
12
use Cmp\Queues\Domain\Queue\Exception\WriterException;
13
use Cmp\Queues\Domain\Queue\Message;
14
use Cmp\Queues\Domain\Queue\QueueWriter as DomainQueueWriter;
15
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\ConnectionConfig;
16
use PhpAmqpLib\Channel\AMQPChannel;
17
use PhpAmqpLib\Connection\AMQPLazyConnection;
18
use PhpAmqpLib\Message\AMQPMessage;
19
use Psr\Log\LoggerInterface;
20
21
class DelayedQueueWriter implements DomainQueueWriter
22
{
23
    const DELAY_QUEUE_PREFIX = 'Delay';
24
25
    /**
26
     * @var LoggerInterface
27
     */
28
    protected $logger;
29
30
    /**
31
     * @var AMQPLazyConnection
32
     */
33
    protected $connection;
34
35
    /**
36
     * @var AMQPChannel
37
     */
38
    protected $channel;
39
40
    /**
41
     * @var int
42
     */
43
    protected $delay;
44
45
    /**
46
     * @var string
47
     */
48
    protected $exchangeName;
49
50
    /**
51
     * @var string
52
     */
53
    protected $delayedExchangeName;
54
55
    /**
56
     * DelayedQueueWriter constructor.
57
     * @param $exchangeName
58
     * @param $delay
59
     * @param AMQPChannel $channel
60
     * @param LoggerInterface $logger
61
     */
62
    public function __construct(
63
        $exchangeName,
64
        $delay,
65
        AMQPChannel $channel,
66
        LoggerInterface $logger
67
    )
68
    {
69
        $this->delay = $delay;
70
        $this->exchangeName = $exchangeName;
71
        $this->delayedExchangeName = self::DELAY_QUEUE_PREFIX.$this->delay.$this->exchangeName;
72
        $this->logger = $logger;
73
        $this->channel = $channel;
74
    }
75
76
    /**
77
     * @param Message[] $messages
78
     * @return void
79
     * @throws WriterException
80
     */
81
    public function write(array $messages)
82
    {
83
        $this->initialize();
84
        try {
85
            foreach($messages as $message) {
86
                $encodedMessage = json_encode($message);
87
                $this->logger->debug('Writing:' . $encodedMessage);
88
                $msg = new AMQPMessage($encodedMessage, array('delivery_mode' => 2));
89
                $this->channel->batch_basic_publish($msg, $this->delayedExchangeName, $message->getName());
90
            }
91
            $this->channel->publish_batch();
92
        } catch(\Exception $exception) {
93
            $this->logger->error('Error writing delayed messages: '.$exception->getMessage());
94
            throw new WriterException($exception->getMessage(), $exception->getCode());
95
        }
96
    }
97
98
    /**
99
     * @throws WriterException
100
     */
101
    protected function initialize()
102
    {
103
        try{
104
            $delayedQueue = self::DELAY_QUEUE_PREFIX.$this->delay.'Queue';
105
106
            $this->logger->info('Creating delayed exchange '.$this->delayedExchangeName);
107
            // Delay Queue
108
            $this->channel->exchange_declare($this->delayedExchangeName, 'fanout', false, true, true);
109
            $this->logger->info('Creating delayed queue '.$delayedQueue);
110
            $this->channel->queue_declare(
111
                $delayedQueue,
112
                false,
113
                true,
114
                false,
115
                true,
116
                false,
117
                [
118
                    'x-expires' => ['I', $this->delay*1000 + 5000],
119
                    'x-message-ttl' => array('I', $this->delay*1000),
120
                    'x-dead-letter-exchange' => array('S', $this->exchangeName)
121
                ]
122
            );
123
            $this->channel->queue_bind($delayedQueue, $this->delayedExchangeName);
124
        } catch(\Exception $exception) {
125
            $this->logger->error('Error configuring delayed queues: '.$exception->getMessage());
126
            throw new WriterException($exception->getMessage(), $exception->getCode());
127
        }
128
    }
129
}